You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/05/12 00:53:48 UTC
reef git commit: [REEF-1795] Implement REEF-on-Spark example
Repository: reef
Updated Branches:
refs/heads/master 49c1414d6 -> 39adc451b
[REEF-1795] Implement REEF-on-Spark example
JIRA:
[REEF-1795](https://issues.apache.org/jira/browse/REEF-1795)
Pull Request:
This closes #1302
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/39adc451
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/39adc451
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/39adc451
Branch: refs/heads/master
Commit: 39adc451b9799f3441000a7cb37764b1f66d5375
Parents: 49c1414
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed May 3 16:47:06 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 11 17:52:31 2017 -0700
----------------------------------------------------------------------
lang/scala/reef-examples-scala/pom.xml | 112 +++++++++++++++++++
.../reef/examples/hellospark/ReefOnSpark.scala | 83 ++++++++++++++
.../examples/hellospark/ReefOnSparkDriver.scala | 71 ++++++++++++
.../examples/hellospark/ReefOnSparkTask.scala | 39 +++++++
pom.xml | 32 +++++-
5 files changed, 336 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/pom.xml
----------------------------------------------------------------------
diff --git a/lang/scala/reef-examples-scala/pom.xml b/lang/scala/reef-examples-scala/pom.xml
new file mode 100644
index 0000000..2e2d07d
--- /dev/null
+++ b/lang/scala/reef-examples-scala/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>reef-examples-spark</artifactId>
+ <name>REEF Spark Examples</name>
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.16.0-SNAPSHOT</version>
+ <relativePath>../../..</relativePath>
+ </parent>
+
+ <properties>
+ <rootPath>${basedir}/../../..</rootPath>
+ <!-- Findbugs does not support Scala code -->
+ <findbugs.skip>true</findbugs.skip>
+ </properties>
+
+ <dependencies>
+ <!-- REEF -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- End of REEF -->
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- End of Spark -->
+ <dependency>
+ <groupId>com.jsuereth</groupId>
+ <artifactId>scala-arm_2.11</artifactId>
+ <version>2.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <outputFile>
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+ </outputFile>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>yarn-default.xml</exclude>
+ <exclude>yarn-version-info.properties</exclude>
+ <exclude>core-default.xml</exclude>
+ <exclude>LICENSE</exclude>
+ <exclude>META-INF/*</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala
----------------------------------------------------------------------
diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala
new file mode 100644
index 0000000..033607e
--- /dev/null
+++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellospark
+
+import java.util.logging.{Level, Logger}
+
+import org.apache.reef.client.{DriverConfiguration, DriverLauncher}
+import org.apache.reef.runtime.common.REEFEnvironment
+import org.apache.reef.runtime.yarn.client.unmanaged.UnmanagedAmYarnClientConfiguration
+import org.apache.reef.runtime.yarn.client.unmanaged.UnmanagedAmYarnDriverConfiguration
+import org.apache.reef.util.EnvironmentUtils
+import org.apache.spark.{SparkConf, SparkContext}
+import resource._
+
+// Run:
+// ..\spark\bin\spark-submit.cmd
+// --master yarn --deploy-mode cluster
+// --class org.apache.reef.examples.hellospark.ReefOnSpark
+// .\target\reef-examples-spark-0.16.0-SNAPSHOT-shaded.jar
+
+object ReefOnSpark {
+
+ private val LOG: Logger = Logger.getLogger(this.getClass.getName)
+
+ private val rootFolder = "."
+
+ private val runtimeConfig = UnmanagedAmYarnClientConfiguration.CONF
+ .set(UnmanagedAmYarnClientConfiguration.ROOT_FOLDER, rootFolder)
+ .build
+
+ def main(args: Array[String]) {
+
+ LOG.setLevel(Level.FINEST)
+
+ val conf = new SparkConf().setAppName("ReefOnSpark:host")
+ val sc = new SparkContext(conf)
+
+ for (client <- managed(DriverLauncher.getLauncher(runtimeConfig))) {
+
+ val jarPath = EnvironmentUtils.getClassLocation(classOf[ReefOnSparkDriver])
+
+ val driverConfig = DriverConfiguration.CONF
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "ReefOnSpark:hello")
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, jarPath)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, classOf[ReefOnSparkDriver#StartHandler])
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, classOf[ReefOnSparkDriver#EvaluatorAllocatedHandler])
+ .build
+
+ val appId = client.submit(driverConfig, 120000)
+
+ LOG.log(Level.INFO, "Job submitted: {0} to {1}", Array[AnyRef](appId, jarPath))
+
+ val yarnAmConfig = UnmanagedAmYarnDriverConfiguration.CONF
+ .set(UnmanagedAmYarnDriverConfiguration.JOB_IDENTIFIER, appId)
+ .set(UnmanagedAmYarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, rootFolder)
+ .build
+
+ for (reef <- managed(REEFEnvironment.fromConfiguration(client.getUser, yarnAmConfig, driverConfig))) {
+ reef.run()
+ val status = reef.getLastStatus
+ LOG.log(Level.INFO, "REEF job {0} completed: state {1}", Array[AnyRef](appId, status.getState))
+ }
+ }
+
+ sc.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala
----------------------------------------------------------------------
diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala
new file mode 100644
index 0000000..a77ff07
--- /dev/null
+++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellospark
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator
+import org.apache.reef.driver.evaluator.EvaluatorRequestor
+import org.apache.reef.driver.task.TaskConfiguration
+import org.apache.reef.tang.Configuration
+import org.apache.reef.tang.annotations.Unit
+import org.apache.reef.wake.EventHandler
+import org.apache.reef.wake.time.event.StartTime
+
+import javax.inject.Inject
+import java.util.logging.Level
+import java.util.logging.Logger
+
+object ReefOnSparkDriver {
+ private val LOG: Logger = Logger.getLogger(classOf[ReefOnSparkDriver].getName)
+}
+
+@Unit
+final class ReefOnSparkDriver @Inject private(val requestor: EvaluatorRequestor) {
+
+ ReefOnSparkDriver.LOG.log(Level.FINE, "Instantiated ReefOnSparkDriver")
+
+ final class StartHandler extends EventHandler[StartTime] {
+ def onNext(startTime: StartTime) {
+
+ ReefOnSparkDriver.LOG.log(Level.INFO, "Start ReefOnSparkDriver: {0}", startTime)
+
+ requestor.newRequest
+ .setNumber(1)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .submit()
+
+ ReefOnSparkDriver.LOG.log(Level.INFO, "Requested Evaluator.")
+ }
+ }
+
+ final class EvaluatorAllocatedHandler extends EventHandler[AllocatedEvaluator] {
+ def onNext(allocatedEvaluator: AllocatedEvaluator) {
+
+ ReefOnSparkDriver.LOG.log(Level.INFO,
+ "Submitting ReefOnSparkTask task to AllocatedEvaluator: {0}", allocatedEvaluator)
+
+ val taskConfiguration: Configuration = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, "ReefOnSparkTask")
+ .set(TaskConfiguration.TASK, classOf[ReefOnSparkTask])
+ .build
+
+ allocatedEvaluator.submitTask(taskConfiguration)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala
----------------------------------------------------------------------
diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala
new file mode 100644
index 0000000..934790d
--- /dev/null
+++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellospark
+
+import javax.inject.Inject
+import java.util.logging.Level
+import java.util.logging.Logger
+
+import org.apache.reef.task.Task
+
+object ReefOnSparkTask {
+ private val LOG: Logger = Logger.getLogger(classOf[ReefOnSparkTask].getName)
+}
+
+final class ReefOnSparkTask @Inject() extends Task {
+
+ ReefOnSparkTask.LOG.log(Level.FINE, "Instantiated ReefOnSparkTask")
+
+ override def call(bytes: Array[Byte]): Array[Byte] = {
+ ReefOnSparkTask.LOG.log(Level.INFO, "Hello Spark!")
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6e41a3e..5940dad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@ under the License.
<bundle.snappy>false</bundle.snappy>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0</hadoop.version>
+ <spark.version>2.1.0</spark.version>
<avro.version>1.8.1</avro.version>
<parquet.version>1.9.0</parquet.version>
<jetty.version>6.1.26</jetty.version>
@@ -275,7 +276,7 @@ under the License.
<exclude>.gitignore</exclude>
<exclude>.git/**</exclude>
<!-- Intellij idea project files -->
- <exclude>lang/java/.idea/**</exclude>
+ <exclude>**/.idea/**</exclude>
<exclude>**/*.iml</exclude>
<exclude>**/target/**</exclude>
<!-- ReadMe files -->
@@ -402,6 +403,25 @@ under the License.
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <args>
+ <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
+ <arg>-nobootcp</arg>
+ </args>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
@@ -611,6 +631,15 @@ under the License.
</dependency>
<!-- END OF HADOOP -->
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- End of Spark -->
+
<!-- Apache Commons -->
<dependency>
<groupId>commons-cli</groupId>
@@ -751,6 +780,7 @@ under the License.
<module>lang/java/reef-webserver</module>
<module>lang/java/reef-utils-hadoop</module>
<module>lang/java/reef-utils</module>
+ <module>lang/scala/reef-examples-scala</module>
<module>website</module>
</modules>