You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:04 UTC
[31/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml
new file mode 100644
index 0000000..b2eb3b7
--- /dev/null
+++ b/lang/java/reef-examples/pom.xml
@@ -0,0 +1,365 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>reef-examples</artifactId>
+ <name>REEF Examples</name>
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <!-- REEF -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-mesos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-checkpoint</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-webserver</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-poison</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- End of REEF -->
+
+ <!-- HADOOP -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- End of HADOOP -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <outputFile>
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+ </outputFile>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>yarn-default.xml</exclude>
+ <exclude>yarn-version-info.properties</exclude>
+ <exclude>core-default.xml</exclude>
+ <exclude>LICENSE</exclude>
+ <exclude>META-INF/*</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <profiles>
+
+ <profile>
+ <id>HelloREEF</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <!-- <argument>-Dlog4j.debug=true</argument> -->
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>org.apache.reef.examples.hello.HelloREEF</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>HelloREEFHttp</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <!-- <argument>-Dlog4j.debug=true</argument> -->
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>org.apache.reef.examples.hellohttp.HelloREEFHttp</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>HelloREEFNoClient</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <!-- <argument>-Dlog4j.debug=true</argument> -->
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>org.apache.reef.examples.hello.HelloREEFNoClient</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>MatMult</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>org.apache.reef.examples.groupcomm.matmul.MatMultREEF</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>RetainedEval</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>org.apache.reef.examples.retained_eval.Launch</argument>
+ <!-- <argument>-cmd</argument>
+ <argument>date</argument>
+ <argument>-num_runs</argument>
+ <argument>20</argument>
+ <argument>-local</argument>
+ <argument>true</argument> -->
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>RetainedEval_yarn</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <argument>org.apache.reef.examples.retained_eval.Launch</argument>
+ <argument>-cmd</argument>
+ <argument>date</argument>
+ <argument>-num_runs</argument>
+ <argument>20</argument>
+ <argument>-local</argument>
+ <argument>false</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>SuspendDemo</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <argument>org.apache.reef.examples.suspend.Launch</argument>
+ <argument>-delay</argument>
+ <argument>1</argument>
+ <argument>-cycles</argument>
+ <argument>20</argument>
+ <argument>-local</argument>
+ <argument>true</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>Pool</id>
+ <build>
+ <defaultGoal>exec:exec</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
+ </argument>
+ <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+ </argument>
+ <argument>org.apache.reef.examples.pool.Launch</argument>
+ <argument>-evaluators</argument>
+ <argument>4</argument>
+ <argument>-tasks</argument>
+ <argument>100</argument>
+ <argument>-delay</argument>
+ <argument>1</argument>
+ <argument>-local</argument>
+ <argument>true</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
new file mode 100644
index 0000000..a6e7544
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Client for the data loading demo app
+ */
+@ClientSide
+public class DataLoadingREEF {
+
+ private static final Logger LOG = Logger.getLogger(DataLoadingREEF.class.getName());
+
+ private static final int NUM_LOCAL_THREADS = 16;
+ private static final int NUM_SPLITS = 6;
+ private static final int NUM_COMPUTE_EVALUATORS = 2;
+
+ public static void main(final String[] args)
+ throws InjectionException, BindException, IOException {
+
+ final Tang tang = Tang.Factory.getTang();
+
+ final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
+
+ new CommandLine(cb)
+ .registerShortNameOfClass(Local.class)
+ .registerShortNameOfClass(TimeOut.class)
+ .registerShortNameOfClass(DataLoadingREEF.InputDir.class)
+ .processCommandLine(args);
+
+ final Injector injector = tang.newInjector(cb.build());
+
+ final boolean isLocal = injector.getNamedInstance(Local.class);
+ final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+ final String inputDir = injector.getNamedInstance(DataLoadingREEF.InputDir.class);
+
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running Data Loading demo on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running Data Loading demo on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder()
+ .setNumber(NUM_COMPUTE_EVALUATORS)
+ .setMemory(512)
+ .setNumberOfCores(1)
+ .build();
+
+ final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder()
+ .setMemoryMB(1024)
+ .setInputFormatClass(TextInputFormat.class)
+ .setInputPath(inputDir)
+ .setNumberOfDesiredSplits(NUM_SPLITS)
+ .setComputeRequest(computeRequest)
+ .setDriverConfigurationModule(DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(LineCounter.class))
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, LineCounter.ContextActiveHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, LineCounter.TaskCompletedHandler.class)
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "DataLoadingREEF"))
+ .build();
+
+ final LauncherStatus state =
+ DriverLauncher.getLauncher(runtimeConfiguration).run(dataLoadConfiguration, jobTimeout);
+
+ LOG.log(Level.INFO, "REEF job completed: {0}", state);
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ @NamedParameter(doc = "Number of minutes before timeout",
+ short_name = "timeout", default_value = "2")
+ public static final class TimeOut implements Name<Integer> {
+ }
+
+ @NamedParameter(short_name = "input")
+ public static final class InputDir implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
new file mode 100644
index 0000000..4b88bd1
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver side for the line counting demo that uses the data loading service.
+ */
+@DriverSide
+@Unit
+public class LineCounter {
+
+ private static final Logger LOG = Logger.getLogger(LineCounter.class.getName());
+
+ private final AtomicInteger ctrlCtxIds = new AtomicInteger();
+ private final AtomicInteger lineCnt = new AtomicInteger();
+ private final AtomicInteger completedDataTasks = new AtomicInteger();
+
+ private final DataLoadingService dataLoadingService;
+
+ @Inject
+ public LineCounter(final DataLoadingService dataLoadingService) {
+ this.dataLoadingService = dataLoadingService;
+ this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions());
+ }
+
+ public class ContextActiveHandler implements EventHandler<ActiveContext> {
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+
+ final String contextId = activeContext.getId();
+ LOG.log(Level.FINER, "Context active: {0}", contextId);
+
+ if (dataLoadingService.isDataLoadedContext(activeContext)) {
+
+ final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement();
+ LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}",
+ new Object[]{lcContextId, contextId});
+
+ final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF
+ .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4")
+ .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+ .build();
+
+ activeContext.submitContext(Tang.Factory.getTang()
+ .newConfigurationBuilder(poisonedConfiguration,
+ ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build())
+ .build());
+
+ } else if (activeContext.getId().startsWith("LineCountCtxt")) {
+
+ final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement();
+ LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId});
+
+ try {
+ activeContext.submitTask(TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.TASK, LineCountingTask.class)
+ .build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex);
+ throw new RuntimeException("Configuration error in " + contextId, ex);
+ }
+ } else {
+ LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId);
+ activeContext.close();
+ }
+ }
+ }
+
+ public class TaskCompletedHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask completedTask) {
+
+ final String taskId = completedTask.getId();
+ LOG.log(Level.FINEST, "Completed Task: {0}", taskId);
+
+ final byte[] retBytes = completedTask.get();
+ final String retStr = retBytes == null ? "No RetVal" : new String(retBytes);
+ LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr});
+
+ lineCnt.addAndGet(Integer.parseInt(retStr));
+
+ if (completedDataTasks.decrementAndGet() <= 0) {
+ LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get());
+ }
+
+ LOG.log(Level.FINEST, "Releasing Context: {0}", taskId);
+ completedTask.getActiveContext().close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
new file mode 100644
index 0000000..1d6ab8e
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The task that iterates over the data set to count the number of records.
+ * Assumes TextInputFormat and that records represent lines.
+ */
+@TaskSide
+public class LineCountingTask implements Task {
+
+ private static final Logger LOG = Logger.getLogger(LineCountingTask.class.getName());
+
+ private final DataSet<LongWritable, Text> dataSet;
+
+ @Inject
+ public LineCountingTask(final DataSet<LongWritable, Text> dataSet) {
+ this.dataSet = dataSet;
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ LOG.log(Level.FINER, "LineCounting task started");
+ int numEx = 0;
+ for (final Pair<LongWritable, Text> keyValue : dataSet) {
+ // LOG.log(Level.FINEST, "Read line: {0}", keyValue);
+ ++numEx;
+ }
+ LOG.log(Level.FINER, "LineCounting task finished: read {0} lines", numEx);
+ return Integer.toString(numEx).getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
new file mode 100644
index 0000000..8344a44
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the Hello REEF Application
+ */
+@Unit
+public final class HelloDriver {
+
+ private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName());
+
+ private final EvaluatorRequestor requestor;
+
+ /**
+ * Job driver constructor - instantiated via TANG.
+ *
+ * @param requestor evaluator requestor object used to create new evaluator containers.
+ */
+ @Inject
+ public HelloDriver(final EvaluatorRequestor requestor) {
+ this.requestor = requestor;
+ LOG.log(Level.FINE, "Instantiated 'HelloDriver'");
+ }
+
+ /**
+ * Handles the StartTime event: Request as single Evaluator.
+ */
+ public final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ HelloDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(1)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ LOG.log(Level.INFO, "Requested Evaluator.");
+ }
+ }
+
+ /**
+ * Handles AllocatedEvaluator: Submit the HelloTask
+ */
+ public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ LOG.log(Level.INFO, "Submitting HelloREEF task to AllocatedEvaluator: {0}", allocatedEvaluator);
+ final Configuration taskConfiguration = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask")
+ .set(TaskConfiguration.TASK, HelloTask.class)
+ .build();
+ allocatedEvaluator.submitTask(taskConfiguration);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
new file mode 100644
index 0000000..ffaa304
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Client for Hello REEF example.
+ */
+public final class HelloREEF {
+
+ private static final Logger LOG = Logger.getLogger(HelloREEF.class.getName());
+
+ /**
+ * Number of milliseconds to wait for the job to complete.
+ */
+ private static final int JOB_TIMEOUT = 10000; // 10 sec.
+
+ /**
+ * @return the configuration of the HelloREEF driver.
+ */
+ public static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ public static LauncherStatus runHelloReef(final Configuration runtimeConf, final int timeOut)
+ throws BindException, InjectionException {
+ final Configuration driverConf = getDriverConfiguration();
+ return DriverLauncher.getLauncher(runtimeConf).run(driverConf, timeOut);
+ }
+
+ /**
+ * Start Hello REEF job. Runs method runHelloReef().
+ *
+ * @param args command line parameters.
+ * @throws BindException configuration error.
+ * @throws InjectionException configuration error.
+ */
+ public static void main(final String[] args) throws BindException, InjectionException {
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+ .build();
+ final LauncherStatus status = runHelloReef(runtimeConfiguration, JOB_TIMEOUT);
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
new file mode 100644
index 0000000..b3b962b
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.mesos.client.MesosClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class HelloREEFMesos {
+ private static final Logger LOG = Logger.getLogger(HelloREEFMesos.class.getName());
+
+ private static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, HelloREEFMesos.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ /**
+ * MASTER_IP(Mesos Master IP) is set to "localhost:5050".
+ * You may change it to suit your cluster environment.
+ */
+ public static void main(final String[] args) throws InjectionException {
+ final LauncherStatus status = DriverLauncher
+ .getLauncher(MesosClientConfiguration.CONF
+ .set(MesosClientConfiguration.MASTER_IP, "localhost:5050")
+ .build())
+ .run(getDriverConfiguration());
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
new file mode 100644
index 0000000..066b336
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A main() for running hello REEF without a persistent client connection.
+ */
+public final class HelloREEFNoClient {
+
+ private static final Logger LOG = Logger.getLogger(HelloREEFNoClient.class.getName());
+
+ public static void runHelloReefWithoutClient(
+ final Configuration runtimeConf) throws InjectionException {
+
+ final REEF reef = Tang.Factory.getTang().newInjector(runtimeConf).getInstance(REEF.class);
+
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+
+ reef.submit(driverConf);
+ }
+
+ public static void main(final String[] args) throws BindException, InjectionException {
+
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+ .build();
+
+ runHelloReefWithoutClient(runtimeConfiguration);
+ LOG.log(Level.INFO, "Job Submitted");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
new file mode 100644
index 0000000..a9d95a6
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Client for Hello REEF example.
+ */
+public final class HelloReefYarn {
+
+ private static final Logger LOG = Logger.getLogger(HelloReefYarn.class.getName());
+
+ /**
+ * Number of milliseconds to wait for the job to complete.
+ */
+ private static final int JOB_TIMEOUT = 30000; // 30 sec.
+
+
+ /**
+ * @return the configuration of the HelloREEF driver.
+ */
+ private static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, HelloReefYarn.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ /**
+ * Start Hello REEF job. Runs method runHelloReef().
+ *
+ * @param args command line parameters.
+ * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+ * @throws org.apache.reef.tang.exceptions.InjectionException configuration error.
+ */
+ public static void main(final String[] args) throws InjectionException {
+
+ final LauncherStatus status = DriverLauncher
+ .getLauncher(YarnClientConfiguration.CONF.build())
+ .run(getDriverConfiguration(), JOB_TIMEOUT);
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
new file mode 100644
index 0000000..a53fb8b
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A 'hello REEF' Task.
+ */
+public final class HelloTask implements Task {
+
+ @Inject
+ HelloTask() {
+ }
+
+ @Override
+ public final byte[] call(final byte[] memento) {
+ System.out.println("Hello, REEF!");
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
new file mode 100644
index 0000000..66bc058
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Hello REEF example.
+ */
+package org.apache.reef.examples.hello;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
new file mode 100644
index 0000000..a92499f
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.DriverServiceConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+import org.apache.reef.webserver.HttpServerReefEventHandler;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import java.util.logging.Logger;
+
+/**
+ * Example to run HelloREEF with a webserver.
+ */
+public final class HelloREEFHttp {
+ /**
+ * Number of milliseconds to wait for the job to complete.
+ */
+ public static final int JOB_TIMEOUT = 60 * 1000; // 60 sec.
+ private static final Logger LOG = Logger.getLogger(HelloREEFHttp.class.getName());
+
+ /**
+ * @return the driver-side configuration to be merged into the DriverConfiguration to enable the HTTP server.
+ */
+ public static Configuration getHTTPConfiguration() {
+ final Configuration httpHandlerConfiguration = HttpHandlerConfiguration.CONF
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class)
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdtHandler.class)
+ .build();
+ final Configuration driverConfigurationForHttpServer = DriverServiceConfiguration.CONF
+ .set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED, ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
+ .set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
+ .set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
+ .build();
+ return Configurations.merge(httpHandlerConfiguration, driverConfigurationForHttpServer);
+ }
+
+ /**
+ * @return the configuration of the HelloREEF driver.
+ */
+ public static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HttpShellJobDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloHTTP")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HttpShellJobDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HttpShellJobDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, HttpShellJobDriver.FailedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, HttpShellJobDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_CLOSED, HttpShellJobDriver.ClosedContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_FAILED, HttpShellJobDriver.FailedContextHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, HttpShellJobDriver.CompletedTaskHandler.class)
+ .set(DriverConfiguration.ON_CLIENT_MESSAGE, HttpShellJobDriver.ClientMessageHandler.class)
+ .set(DriverConfiguration.ON_CLIENT_CLOSED, HttpShellJobDriver.HttpClientCloseHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, HttpShellJobDriver.StopHandler.class)
+ .build();
+ }
+
+ /**
+ * Run Hello Reef with merged configuration
+ *
+ * @param runtimeConf
+ * @param timeOut
+ * @return
+ * @throws BindException
+ * @throws InjectionException
+ */
+ public static LauncherStatus runHelloReef(final Configuration runtimeConf, final int timeOut)
+ throws BindException, InjectionException {
+ final Configuration driverConf = Configurations.merge(HelloREEFHttp.getDriverConfiguration(), getHTTPConfiguration());
+ return DriverLauncher.getLauncher(runtimeConf).run(driverConf, timeOut);
+ }
+
+ /**
+ * main program
+ *
+ * @param args
+ * @throws InjectionException
+ */
+ public static void main(final String[] args) throws InjectionException {
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+ .build();
+ final LauncherStatus status = runHelloReef(runtimeConfiguration, HelloREEFHttp.JOB_TIMEOUT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
new file mode 100644
index 0000000..f3b05de
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * HelloREEFHttp for running on Yarn
+ */
+public class HelloREEFHttpYarn {
+
+ private static final Logger LOG = Logger.getLogger(HelloREEFHttpYarn.class.getName());
+
+ /**
+ * Start Hello REEF job. Runs method runHelloReef().
+ *
+ * @param args command line parameters.
+ * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+ * @throws org.apache.reef.tang.exceptions.InjectionException configuration error.
+ */
+ public static void main(final String[] args) throws BindException, InjectionException, IOException {
+
+ final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+
+ final LauncherStatus status = HelloREEFHttp.runHelloReef(runtimeConfiguration, HelloREEFHttp.JOB_TIMEOUT);
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
new file mode 100644
index 0000000..ff22d81
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.CommandUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Http Event handler for Shell Command
+ */
+@Unit
+class HttpServerShellCmdtHandler implements HttpHandler {
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(HttpServerShellCmdtHandler.class.getName());
+
+ private static final int WAIT_TIMEOUT = 10 * 1000;
+
+ private static final int WAIT_TIME = 50;
+
+ /**
+ * ClientMessageHandler
+ */
+ private final InjectionFuture<HttpShellJobDriver.ClientMessageHandler> messageHandler;
+
+ /**
+ * uri specification
+ */
+ private String uriSpecification = "Command";
+
+ /**
+ * output for command
+ */
+ private String cmdOutput = null;
+
+ /**
+ * HttpServerDistributedShellEventHandler constructor.
+ */
+ @Inject
+ public HttpServerShellCmdtHandler(final InjectionFuture<HttpShellJobDriver.ClientMessageHandler> messageHandler) {
+ this.messageHandler = messageHandler;
+ }
+
+ /**
+ * returns URI specification for the handler
+ *
+ * @return
+ */
+ @Override
+ public String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ /**
+ * set URI specification
+ *
+ * @param s
+ */
+ public void setUriSpecification(final String s) {
+ uriSpecification = s;
+ }
+
+ /**
+ * it is called when receiving a http request
+ *
+ * @param parsedHttpRequest
+ * @param response
+ */
+ @Override
+ public final synchronized void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException {
+ LOG.log(Level.INFO, "HttpServeShellCmdtHandler in webserver onHttpRequest is called: {0}", parsedHttpRequest.getRequestUri());
+ final Map<String, List<String>> queries = parsedHttpRequest.getQueryMap();
+ final String queryStr = parsedHttpRequest.getQueryString();
+
+ if (parsedHttpRequest.getTargetEntity().equalsIgnoreCase("Evaluators")) {
+ final byte[] b = HttpShellJobDriver.CODEC.encode(queryStr);
+ LOG.log(Level.INFO, "HttpServeShellCmdtHandler call HelloDriver onCommand(): {0}", queryStr);
+ messageHandler.get().onNext(b);
+
+ notify();
+
+ final long endTime = System.currentTimeMillis() + WAIT_TIMEOUT;
+ while (cmdOutput == null) {
+ final long waitTime = endTime - System.currentTimeMillis();
+ if (waitTime <= 0) {
+ break;
+ }
+
+ try {
+ wait(WAIT_TIME);
+ } catch (final InterruptedException e) {
+ LOG.log(Level.WARNING, "HttpServeShellCmdtHandler onHttpRequest InterruptedException: {0}", e);
+ }
+ }
+ response.getOutputStream().write(cmdOutput.getBytes(Charset.forName("UTF-8")));
+ cmdOutput = null;
+ } else if (parsedHttpRequest.getTargetEntity().equalsIgnoreCase("Driver")) {
+ final String cmdOutput = CommandUtils.runCommand(queryStr);
+ response.getOutputStream().write(cmdOutput.getBytes(Charset.forName("UTF-8")));
+ }
+ }
+
+ /**
+ * called after shell command is completed
+ *
+ * @param message
+ */
+ public final synchronized void onHttpCallback(byte[] message) {
+ final long endTime = System.currentTimeMillis() + WAIT_TIMEOUT;
+ while (cmdOutput != null) {
+ final long waitTime = endTime - System.currentTimeMillis();
+ if (waitTime <= 0) {
+ break;
+ }
+
+ try {
+ wait(WAIT_TIME);
+ } catch (final InterruptedException e) {
+ LOG.log(Level.WARNING, "HttpServeShellCmdtHandler onHttpCallback InterruptedException: {0}", e);
+ }
+ }
+ LOG.log(Level.INFO, "HttpServeShellCmdtHandler OnCallback: {0}", HttpShellJobDriver.CODEC.decode(message));
+ cmdOutput = HttpShellJobDriver.CODEC.decode(message);
+
+ notify();
+ }
+
+ /**
+ * Handler for client to call back
+ */
+ final class ClientCallBackHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ HttpServerShellCmdtHandler.this.onHttpCallback(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
new file mode 100644
index 0000000..44b62a3
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the Hello REEF Http Distributed Shell Application
+ */
+@Unit
+public final class HttpShellJobDriver {
+
+ /**
+ * String codec is used to encode the results
+ * before passing them back to the client.
+ */
+ public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName());
+ /**
+ * Evaluator Requester
+ */
+ private final EvaluatorRequestor evaluatorRequestor;
+ /**
+ * Number of Evalutors to request (default is 1).
+ */
+ private final int numEvaluators = 2;
+ /**
+ * Shell execution results from each Evaluator.
+ */
+ private final List<String> results = new ArrayList<>();
+ /**
+ * Map from context ID to running evaluator context.
+ */
+ private final Map<String, ActiveContext> contexts = new HashMap<>();
+ /**
+ * Job driver state.
+ */
+ private State state = State.INIT;
+ /**
+ * First command to execute. Sometimes client can send us the first command
+ * before Evaluators are available; we need to store this command here.
+ */
+ private String cmd;
+ /**
+ * Number of evaluators/tasks to complete.
+ */
+ private int expectCount = 0;
+ /**
+ * Callback handler for http return message
+ */
+ private HttpServerShellCmdtHandler.ClientCallBackHandler httpCallbackHandler;
+
+ /**
+ * Job Driver Constructor
+ *
+ * @param requestor
+ * @param clientCallBackHandler
+ */
+ @Inject
+ public HttpShellJobDriver(final EvaluatorRequestor requestor, final HttpServerShellCmdtHandler.ClientCallBackHandler clientCallBackHandler) {
+ this.evaluatorRequestor = requestor;
+ this.httpCallbackHandler = clientCallBackHandler;
+ LOG.log(Level.FINE, "Instantiated 'HelloDriver'");
+ }
+
+ /**
+ * Construct the final result and forward it to the Client.
+ */
+ private void returnResults() {
+ final StringBuilder sb = new StringBuilder();
+ for (final String result : this.results) {
+ sb.append(result);
+ }
+ this.results.clear();
+ LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
+ httpCallbackHandler.onNext(CODEC.encode(sb.toString()));
+ }
+
+ /**
+ * Submit command to all available evaluators.
+ *
+ * @param command shell command to execute.
+ */
+ private void submit(final String command) {
+ LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
+ new Object[]{command, this.contexts.size(), this.state});
+ assert (this.state == State.READY);
+ this.expectCount = this.contexts.size();
+ this.state = State.WAIT_TASKS;
+ this.cmd = null;
+ for (final ActiveContext context : this.contexts.values()) {
+ this.submit(context, command);
+ }
+ }
+
+ /**
+ * Submit a Task that execute the command to a single Evaluator.
+ * This method is called from <code>submitTask(cmd)</code>.
+ */
+ private void submit(final ActiveContext context, final String command) {
+ try {
+ LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.addConfiguration(
+ TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+ .set(TaskConfiguration.TASK, ShellTask.class)
+ .build()
+ );
+ cb.bindNamedParameter(Command.class, command);
+ context.submitTask(cb.build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+ context.close();
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Request the evaluators.
+ */
+ private synchronized void requestEvaluators() {
+ assert (this.state == State.INIT);
+ LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
+ this.evaluatorRequestor.submit(
+ EvaluatorRequest.newBuilder()
+ .setMemory(128)
+ .setNumberOfCores(1)
+ .setNumber(this.numEvaluators).build()
+ );
+ this.state = State.WAIT_EVALUATORS;
+ this.expectCount = this.numEvaluators;
+ }
+
+ /**
+ * Possible states of the job driver. Can be one of:
+ * <dl>
+ * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
+ * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
+ * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
+ * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
+ * </dl>
+ */
+ private enum State {
+ INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ synchronized (HttpShellJobDriver.this) {
+ LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
+ new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()});
+ assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+ try {
+ eval.submitContext(ContextConfiguration.CONF.set(
+ ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the entire Evaluator had failed.
+ * Stop other jobs and pass this error to the job observer on the client.
+ */
+ final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ synchronized (HttpShellJobDriver.this) {
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ HttpShellJobDriver.this.contexts.remove(failedContext.getId());
+ }
+ throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
+ }
+ }
+ }
+
+ /**
+ * Receive notification that a new Context is available.
+ * Submit a new Distributed Shell Task to that Context.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ synchronized (HttpShellJobDriver.this) {
+ LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
+ new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state});
+ assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+ HttpShellJobDriver.this.contexts.put(context.getId(), context);
+ if (--HttpShellJobDriver.this.expectCount <= 0) {
+ HttpShellJobDriver.this.state = State.READY;
+ if (HttpShellJobDriver.this.cmd == null) {
+ LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
+ HttpShellJobDriver.this.state);
+ } else {
+ HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Context had completed.
+ * Remove context from the list of active context.
+ */
+ final class ClosedContextHandler implements EventHandler<ClosedContext> {
+ @Override
+ public void onNext(final ClosedContext context) {
+ LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+ synchronized (HttpShellJobDriver.this) {
+ HttpShellJobDriver.this.contexts.remove(context.getId());
+ }
+ }
+ }
+
+ final class HttpClientCloseHandler implements EventHandler<Void> {
+ @Override
+ public void onNext(final Void aVoid) throws RuntimeException {
+ LOG.log(Level.INFO, "Received a close message from the client. You can put code here to properly close drivers and evaluators.");
+ for (final ActiveContext c : contexts.values()) {
+ c.close();
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Context had failed.
+ * Remove context from the list of active context and notify the client.
+ */
+ final class FailedContextHandler implements EventHandler<FailedContext> {
+ @Override
+ public void onNext(final FailedContext context) {
+ LOG.log(Level.SEVERE, "FailedContext", context);
+ synchronized (HttpShellJobDriver.this) {
+ HttpShellJobDriver.this.contexts.remove(context.getId());
+ }
+ throw new RuntimeException("Failed context: ", context.asError());
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+ // Take the message returned by the task and add it to the running result.
+ final String result = CODEC.decode(task.get());
+ synchronized (HttpShellJobDriver.this) {
+ HttpShellJobDriver.this.results.add(task.getId() + " :: " + result);
+ LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
+ task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state});
+ if (--HttpShellJobDriver.this.expectCount <= 0) {
+ HttpShellJobDriver.this.returnResults();
+ HttpShellJobDriver.this.state = State.READY;
+ if (HttpShellJobDriver.this.cmd != null) {
+ HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification from the client.
+ */
+ final class ClientMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ synchronized (HttpShellJobDriver.this) {
+ final String command = CODEC.decode(message);
+ LOG.log(Level.INFO, "Client message: {0} state: {1}",
+ new Object[]{command, HttpShellJobDriver.this.state});
+ assert (HttpShellJobDriver.this.cmd == null);
+ if (HttpShellJobDriver.this.state == State.READY) {
+ HttpShellJobDriver.this.submit(command);
+ } else {
+ // not ready yet - save the command for better times.
+ assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+ HttpShellJobDriver.this.cmd = command;
+ }
+ }
+ }
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
+ assert (state == State.INIT);
+ requestEvaluators();
+ }
+ }
+
+ /**
+ * Shutting down the job driver: close the evaluators.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime time) {
+ LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
+ for (final ActiveContext context : contexts.values()) {
+ context.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
new file mode 100644
index 0000000..85d811a
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.library;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Command line parameter: a command to run. e.g. "echo Hello REEF"
+ */
+@NamedParameter(doc = "The shell command", short_name = "cmd", default_value = "*INTERACTIVE*")
+public final class Command implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
new file mode 100644
index 0000000..0ea7b99
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.library;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.Task;
+import org.apache.reef.util.CommandUtils;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Execute command, capture its stdout, and return that string to the job driver.
+ */
+public class ShellTask implements Task {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(ShellTask.class.getName());
+
+ /**
+ * A command to execute.
+ */
+ private final String command;
+
+ /**
+ * Object Serializable Codec
+ */
+ private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+
+ /**
+ * Task constructor. Parameters are injected automatically by TANG.
+ *
+ * @param command a command to execute.
+ */
+ @Inject
+ private ShellTask(@Parameter(Command.class) final String command) {
+ this.command = command;
+ }
+
+ /**
+ * Execute the shell command and return the result, which is sent back to
+ * the JobDriver and surfaced in the CompletedTask object.
+ *
+ * @param memento ignored.
+ * @return byte string containing the stdout from executing the shell command.
+ */
+ @Override
+ public byte[] call(final byte[] memento) {
+ String result = CommandUtils.runCommand(this.command);
+ LOG.log(Level.INFO, result);
+ return CODEC.encode(result);
+ }
+}