You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/28 22:09:31 UTC
[2/2] incubator-reef git commit: [REEF-65] Replace Retained
Evaluators example with the Scheduler
[REEF-65] Replace Retained Evaluators example with the Scheduler
This removes the Retained Evaluators example.
JIRA:
[REEF-65](https://issues.apache.org/jira/browse/REEF-65)
Pull Request:
This closes #163
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b9f38b25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b9f38b25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b9f38b25
Branch: refs/heads/master
Commit: b9f38b25d4253530d7f72c8178f1424beb8b6d28
Parents: b2e85d1
Author: Yunseong Lee <yu...@apache.org>
Authored: Sat Apr 25 21:27:47 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Apr 28 13:06:55 2015 -0700
----------------------------------------------------------------------
bin/run.sh | 2 +-
.../Org.Apache.REEF.Examples.csproj | 6 +-
.../RetainedEvalActiveContextHandler.cs | 56 ---
.../RetainedEvalAllocatedEvaluatorHandler.cs | 48 --
.../RetainedEvalEvaluatorRequestorHandler.cs | 47 --
.../Handlers/RetainedEvalStartHandler.cs | 89 ----
lang/java/reef-examples-clr/pom.xml | 35 --
.../examples/retained_evalCLR/JobClient.java | 317 ------------
.../examples/retained_evalCLR/JobDriver.java | 489 -------------------
.../reef/examples/retained_evalCLR/Launch.java | 189 -------
.../examples/retained_evalCLR/package-info.java | 22 -
lang/java/reef-examples/pom.xml | 59 ---
.../reef/examples/retained_eval/JobClient.java | 335 -------------
.../reef/examples/retained_eval/JobDriver.java | 370 --------------
.../reef/examples/retained_eval/Launch.java | 185 -------
.../examples/retained_eval/package-info.java | 22 -
.../examples/scheduler/SchedulerDriver.java | 6 +-
.../reef/tests/examples/ExamplesTestSuite.java | 3 +-
.../tests/examples/TestRetainedEvaluators.java | 81 ---
19 files changed, 6 insertions(+), 2355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/bin/run.sh
----------------------------------------------------------------------
diff --git a/bin/run.sh b/bin/run.sh
index 21ae5fc..b5bb0a4 100755
--- a/bin/run.sh
+++ b/bin/run.sh
@@ -19,7 +19,7 @@
#
# EXAMPLE USAGE
-# ./run.sh org.apache.reef.examples.retained_eval.Launch -num_eval 2 -local false
+# ./run.sh org.apache.reef.examples.hello.HelloREEF
# RUNTIME
SELF_JAR=`echo $REEF_HOME/reef-examples/target/reef-examples-*-shaded.jar`
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
index abed026..1e342ed 100644
--- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
@@ -69,10 +69,6 @@ under the License.
<Compile Include="MachineLearning\KMeans\LegacyKMeansTask.cs" />
<Compile Include="MachineLearning\KMeans\PartialMean.cs" />
<Compile Include="MachineLearning\KMeans\ProcessedResults.cs" />
- <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalActiveContextHandler.cs" />
- <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalAllocatedEvaluatorHandler.cs" />
- <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalEvaluatorRequestorHandler.cs" />
- <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalStartHandler.cs" />
<Compile Include="Tasks\FailedTask\FailedTask.cs" />
<Compile Include="Tasks\HelloTask\HelloService.cs" />
<Compile Include="Tasks\HelloTask\HelloTask.cs" />
@@ -117,4 +113,4 @@ under the License.
<Target Name="AfterBuild">
</Target>
-->
-</Project>
\ No newline at end of file
+</Project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs
deleted file mode 100644
index 5961e85..0000000
--- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Examples.Tasks.ShellTask;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-
-namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers
-{
- public class RetainedEvalActiveContextHandler : IObserver<IActiveContext>
- {
- public void OnNext(IActiveContext activeContext)
- {
- ICsConfigurationBuilder cb = TangFactory.GetTang().NewConfigurationBuilder();
- cb.AddConfiguration(TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, "bridgeCLRShellTask_" + DateTime.Now.Ticks)
- .Set(TaskConfiguration.Task, GenericType<ShellTask>.Class)
- .Build());
- cb.BindNamedParameter<ShellTask.Command, string>(GenericType<ShellTask.Command>.Class, "echo");
-
- IConfiguration taskConfiguration = cb.Build();
-
- activeContext.SubmitTask(taskConfiguration);
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs
deleted file mode 100644
index c591382..0000000
--- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers
-{
- public class RetainedEvalAllocatedEvaluatorHandler : IObserver<IAllocatedEvaluator>
- {
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnNext(IAllocatedEvaluator allocatedEvaluator)
- {
- IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule
- .Set(ContextConfiguration.Identifier, "RetainedEvalCLRBridgeContextId")
- .Build();
-
- allocatedEvaluator.SubmitContext(contextConfiguration);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs
deleted file mode 100644
index 172cd4e..0000000
--- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Driver.Evaluator;
-
-namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers
-{
- public class RetainedEvalEvaluatorRequestorHandler : IObserver<IEvaluatorRequestor>
- {
- public void OnNext(IEvaluatorRequestor requestor)
- {
- int evaluatorsNumber = 1;
- int memory = 512;
- string rack = "WonderlandRack";
- EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, rack);
-
- requestor.Submit(request);
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs
deleted file mode 100644
index cc2448e..0000000
--- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Examples.Tasks.ShellTask;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers
-{
- public class RetainedEvalStartHandler : IStartHandler
- {
- private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorHandler;
- private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorHandler;
- private static ClrSystemHandler<IActiveContext> _activeContextHandler;
-
- [Inject]
- public RetainedEvalStartHandler()
- {
- CreateClassHierarchy();
- Identifier = "RetainedEvalStartHandler";
- }
-
- public RetainedEvalStartHandler(string id)
- {
- Identifier = id;
- CreateClassHierarchy();
- }
-
- public string Identifier { get; set; }
-
- public IList<ulong> GetHandlers()
- {
- ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray();
-
- // initiate Evaluator Requestor handler
- _evaluatorRequestorHandler = new ClrSystemHandler<IEvaluatorRequestor>();
- handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorHandler);
- Console.WriteLine("_evaluatorRequestorHandler initiated");
- _evaluatorRequestorHandler.Subscribe(new RetainedEvalEvaluatorRequestorHandler());
-
- // initiate Allocated Evaluator handler
- _allocatedEvaluatorHandler = new ClrSystemHandler<IAllocatedEvaluator>();
- handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorHandler);
- Console.WriteLine("_allocatedEvaluatorHandler initiated");
- _allocatedEvaluatorHandler.Subscribe(new RetainedEvalAllocatedEvaluatorHandler());
-
- // initiate Active Context handler
- _activeContextHandler = new ClrSystemHandler<IActiveContext>();
- handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextHandler);
- Console.WriteLine("_activeContextHandler initiated");
- _activeContextHandler.Subscribe(new RetainedEvalActiveContextHandler());
-
- return handlers;
- }
-
- private void CreateClassHierarchy()
- {
- HashSet<string> clrDlls = new HashSet<string>();
- clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
- clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
- clrDlls.Add(typeof(ShellTask).Assembly.GetName().Name);
-
- ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples-clr/pom.xml b/lang/java/reef-examples-clr/pom.xml
index 2cb2171..33e5c6a 100644
--- a/lang/java/reef-examples-clr/pom.xml
+++ b/lang/java/reef-examples-clr/pom.xml
@@ -116,42 +116,7 @@ under the License.
</plugins>
</build>
-
<profiles>
-
-
- <profile>
- <id>RetainedEvalCLR</id>
- <build>
- <defaultGoal>exec:exec</defaultGoal>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <configuration>
- <executable>java</executable>
- <arguments>
- <argument>-classpath</argument>
- <classpath/>
- <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
- </argument>
- <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
- </argument>
- <argument>org.apache.reef.examples.retained_evalCLR.Launch</argument>
- <argument>dotnetDistributedShell</argument>
- <!-- <argument>-cmd</argument>
- <argument>date</argument>
- <argument>-num_runs</argument>
- <argument>20</argument>
- <argument>-local</argument>
- <argument>true</argument> -->
- </arguments>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
-
<profile>
<id>HelloCLR</id>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java
deleted file mode 100644
index 8823c87..0000000
--- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.retained_evalCLR;
-
-import org.apache.reef.client.*;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-
-import javax.inject.Inject;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Retained Evaluator Shell Client.
- */
-@Unit
-public class JobClient {
-
- /**
- * Standard java logger.
- */
- private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
-
- /**
- * Codec to translate messages to and from the job driver
- */
- private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
-
- /**
- * Reference to the REEF framework.
- * This variable is injected automatically in the constructor.
- */
- private final REEF reef;
-
- /**
- * Shell command to submitTask to the job driver.
- */
- private final String command;
- /**
- * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
- */
- private final boolean isInteractive;
- /**
- * Total number of experiments to run.
- */
- private final int maxRuns;
- /**
- * Command prompt reader for the interactive mode (stdin).
- */
- private final BufferedReader prompt;
- /**
- * Job Driver configuration.
- */
- private Configuration driverConfiguration;
- private ConfigurationModule driverConfigModule;
- /**
- * A reference to the running job that allows client to send messages back to the job driver
- */
- private RunningJob runningJob;
-
- /**
- * Start timestamp of the current task.
- */
- private long startTime = 0;
-
- /**
- * Total time spent performing tasks in Evaluators.
- */
- private long totalTime = 0;
-
- /**
- * Number of experiments ran so far.
- */
- private int numRuns = 0;
-
- /**
- * Set to false when job driver is done.
- */
- private boolean isBusy = true;
-
- /**
- * Retained Evaluator client.
- * Parameters are injected automatically by TANG.
- *
- * @param command Shell command to run on each Evaluator.
- * @param reef Reference to the REEF framework.
- */
- @Inject
- JobClient(final REEF reef,
- @Parameter(Launch.Command.class) final String command,
- @Parameter(Launch.NumRuns.class) final Integer numRuns) throws BindException {
-
- this.reef = reef;
- this.command = command;
- this.maxRuns = numRuns;
-
- // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
- this.isInteractive = this.command ==
- Launch.Command.class.getAnnotation(NamedParameter.class).default_value();
-
- this.prompt = this.isInteractive ? new BufferedReader(new InputStreamReader(System.in)) : null;
-
- this.driverConfigModule = DriverConfiguration.CONF
- .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
- .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
- .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
- .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
- .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
- .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
- .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
- .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class);
- }
-
- private void addCLRFiles(final File folder) throws BindException {
- ConfigurationModule result = this.driverConfigModule;
- for (final File f : folder.listFiles()) {
- if (f.canRead() && f.exists() && f.isFile()) {
- result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
- }
- }
-
- this.driverConfigModule = result;
- this.driverConfiguration = this.driverConfigModule.build();
- }
-
- /**
- * Launch the job driver.
- *
- * @throws BindException configuration error.
- */
- public void submit(File clrFolder) {
- try {
- addCLRFiles(clrFolder);
- } catch (final BindException e) {
- LOG.log(Level.FINE, "Failed to bind", e);
- }
- this.reef.submit(this.driverConfiguration);
- }
-
- /**
- * Send command to the job driver. Record timestamp when the command was sent.
- * If this.command is set, use it; otherwise, ask user for the command.
- */
- private synchronized void submitTask() {
- if (this.isInteractive) {
- String cmd;
- try {
- do {
- System.out.print("\nRE> ");
- cmd = this.prompt.readLine();
- } while (cmd != null && cmd.trim().isEmpty());
- } catch (final IOException ex) {
- LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
- cmd = null;
- }
- if (cmd == null || cmd.equals("exit")) {
- this.runningJob.close();
- stopAndNotify();
- } else {
- this.submitTask(cmd);
- }
- } else {
- // non-interactive batch mode:
- this.submitTask(this.command);
- }
- }
-
- /**
- * Send command to the job driver. Record timestamp when the command was sent.
- *
- * @param cmd shell command to execute in all Evaluators.
- */
- private synchronized void submitTask(final String cmd) {
- LOG.log(Level.INFO, "Submit task {0} \"{1}\" to {2}",
- new Object[]{this.numRuns + 1, cmd, this.runningJob});
- this.startTime = System.currentTimeMillis();
- this.runningJob.send(CODEC.encode(cmd));
- }
-
- /**
- * Notify the process in waitForCompletion() method that the main process has finished.
- */
- private synchronized void stopAndNotify() {
- this.runningJob = null;
- this.isBusy = false;
- this.notify();
- }
-
- /**
- * Wait for the job driver to complete. This method is called from Launcher.main()
- */
- public void waitForCompletion() {
- while (this.isBusy) {
- LOG.info("Waiting for the Job Driver to complete.");
- try {
- synchronized (this) {
- this.wait();
- }
- } catch (final InterruptedException ex) {
- LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
- }
- }
- this.reef.close();
- }
-
- /**
- * Receive notification from the job driver that the job is running.
- */
- final class RunningJobHandler implements EventHandler<RunningJob> {
- @Override
- public void onNext(final RunningJob job) {
- LOG.log(Level.INFO, "Running job: {0}", job.getId());
- synchronized (JobClient.this) {
- JobClient.this.runningJob = job;
- JobClient.this.submitTask();
- }
- }
- }
-
- /**
- * Receive message from the job driver.
- * There is only one message, which comes at the end of the driver execution
- * and contains shell command output on each node.
- */
- final class JobMessageHandler implements EventHandler<JobMessage> {
- @Override
- public void onNext(final JobMessage message) {
- synchronized (JobClient.this) {
-
- final String result = CODEC.decode(message.get());
- final long jobTime = System.currentTimeMillis() - startTime;
- totalTime += jobTime;
- ++numRuns;
-
- LOG.log(Level.INFO, "Task {0} completed in {1} msec.:\n{2}",
- new Object[]{numRuns, jobTime, result});
-
- System.out.println(result);
-
- if (runningJob != null) {
- if (isInteractive || numRuns < maxRuns) {
- submitTask();
- } else {
- LOG.log(Level.INFO,
- "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
- new Object[]{maxRuns, totalTime / (double) maxRuns});
- runningJob.close();
- stopAndNotify();
- }
- }
- }
- }
- }
-
- /**
- * Receive notification from the job driver that the job had failed.
- */
- final class FailedJobHandler implements EventHandler<FailedJob> {
- @Override
- public void onNext(final FailedJob job) {
- LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
- stopAndNotify();
- }
- }
-
- /**
- * Receive notification from the job driver that the job had completed successfully.
- */
- final class CompletedJobHandler implements EventHandler<CompletedJob> {
- @Override
- public void onNext(final CompletedJob job) {
- LOG.log(Level.INFO, "Completed job: {0}", job.getId());
- stopAndNotify();
- }
- }
-
- /**
- * Receive notification that there was an exception thrown from the job driver.
- */
- final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
- @Override
- public void onNext(final FailedRuntime error) {
- LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
- stopAndNotify();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java
deleted file mode 100644
index c4f95e1..0000000
--- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.retained_evalCLR;
-
-import org.apache.reef.driver.catalog.ResourceCatalog;
-import org.apache.reef.driver.client.JobMessageObserver;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.*;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.TaskConfiguration;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.examples.library.ShellTask;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy;
-import org.apache.reef.tang.proto.ClassHierarchyProto;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.event.Alarm;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Retained Evaluator example job driver. Execute shell command on all evaluators,
- * capture stdout, and return concatenated results back to the client.
- */
-@Unit
-public final class JobDriver {
- public static final String SHELL_TASK_CLASS_HIERARCHY_FILENAME = "ShellTask.bin";
- /**
- * Standard Java logger.
- */
- private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
- /**
- * Duration of one clock interval.
- */
- private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
- private static final String JVM_CONTEXT_SUFFIX = "_JVMContext";
- private static final String CLR_CONTEXT_SUFFIX = "_CLRContext";
- /**
- * String codec is used to encode the results
- * before passing them back to the client.
- */
- private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
- public static int totalEvaluators = 2;
- /**
- * Wake clock is used to schedule periodical job check-ups.
- */
- private final Clock clock;
- /**
- * Job observer on the client.
- * We use it to send results from the driver back to the client.
- */
- private final JobMessageObserver jobMessageObserver;
- /**
- * Job driver uses EvaluatorRequestor
- * to request Evaluators that will run the Tasks.
- */
- private final EvaluatorRequestor evaluatorRequestor;
- /**
- * Static catalog of REEF resources.
- * We use it to schedule Task on every available node.
- */
- private final ResourceCatalog catalog;
- /**
- * Shell execution results from each Evaluator.
- */
- private final List<String> results = new ArrayList<>();
- /**
- * Map from context ID to running evaluator context.
- */
- private final Map<String, ActiveContext> contexts = new HashMap<>();
- private int nCLREvaluator = 1; // guarded by this
- private int nJVMEvaluator = totalEvaluators - nCLREvaluator; // guarded by this
- /**
- * Job driver state.
- */
- private State state = State.INIT;
- /**
- * First command to execute. Sometimes client can send us the first command
- * before Evaluators are available; we need to store this command here.
- */
- private String cmd;
- /**
- * Number of evaluators/tasks to complete.
- */
- private int expectCount = 0;
-
- /**
- * Job driver constructor.
- * All parameters are injected from TANG automatically.
- *
- * @param clock Wake clock to schedule and check up running jobs.
- * @param jobMessageObserver is used to send messages back to the client.
- * @param evaluatorRequestor is used to request Evaluators.
- */
- @Inject
- JobDriver(final Clock clock,
- final JobMessageObserver jobMessageObserver,
- final EvaluatorRequestor evaluatorRequestor,
- final ResourceCatalog catalog) {
- this.clock = clock;
- this.jobMessageObserver = jobMessageObserver;
- this.evaluatorRequestor = evaluatorRequestor;
- this.catalog = catalog;
- }
-
- /**
- * Makes a task configuration for the CLR ShellTask.
- *
- * @param taskId
- * @return task configuration for the CLR Task.
- * @throws BindException
- */
- private static final Configuration getCLRTaskConfiguration(
- final String taskId, final String command) throws BindException {
-
- final ConfigurationBuilder cb = Tang.Factory.getTang()
- .newConfigurationBuilder(loadShellTaskClassHierarchy(SHELL_TASK_CLASS_HIERARCHY_FILENAME));
-
- cb.bind("Org.Apache.Reef.Tasks.ITask, Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Org.Apache.Reef.Tasks.ShellTask, Org.Apache.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null");
- cb.bind("Org.Apache.Reef.Tasks.TaskConfigurationOptions+Identifier, Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId);
- cb.bind("Org.Apache.Reef.Tasks.ShellTask+Command, Org.Apache.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null", command);
-
- return cb.build();
- }
-
- /**
- * Makes a task configuration for the JVM ShellTask..
- *
- * @param taskId
- * @return task configuration for the JVM Task.
- * @throws BindException
- */
- private static final Configuration getJVMTaskConfiguration(
- final String taskId, final String command) throws BindException {
-
- final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
- cb.addConfiguration(
- TaskConfiguration.CONF
- .set(TaskConfiguration.IDENTIFIER, taskId)
- .set(TaskConfiguration.TASK, ShellTask.class)
- .build()
- );
- cb.bindNamedParameter(Command.class, command);
- return cb.build();
- }
-
- /**
- * Loads the class hierarchy.
- *
- * @return
- */
- private static ClassHierarchy loadShellTaskClassHierarchy(String binFile) {
- try (final InputStream chin = new FileInputStream(binFile)) {
- final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin);
- final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root);
- return ch;
- } catch (final IOException e) {
- final String message = "Unable to load class hierarchy " + binFile;
- LOG.log(Level.SEVERE, message, e);
- throw new RuntimeException(message, e);
- }
- }
-
- private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) {
- synchronized (JobDriver.this) {
-
- String contextIdSuffix = type.equals(EvaluatorType.JVM) ? JVM_CONTEXT_SUFFIX : CLR_CONTEXT_SUFFIX;
- String contextId = eval.getId() + contextIdSuffix;
-
- eval.setType(type);
-
- LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
- new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- try {
- eval.submitContext(ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, contextId).build());
- } catch (final BindException ex) {
- LOG.log(Level.SEVERE, "Failed to submit context " + contextId, ex);
- throw new RuntimeException(ex);
- }
- }
- }
-
- /**
- * Submit command to all available evaluators.
- *
- * @param command shell command to execute.
- */
- private void submit(final String command) {
- LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
- new Object[]{command, this.contexts.size(), this.state});
- assert (this.state == State.READY);
- this.expectCount = this.contexts.size();
- this.state = State.WAIT_TASKS;
- this.cmd = null;
- for (final ActiveContext context : this.contexts.values()) {
- this.submit(context, command);
- }
- }
-
- /**
- * Submit a Task that execute the command to a single Evaluator.
- * This method is called from <code>submitTask(cmd)</code>.
- */
- private void submit(final ActiveContext context, final String command) {
- try {
- LOG.log(Level.INFO, "Sending command {0} to context: {1}", new Object[]{command, context});
- String taskId = context.getId() + "_task";
- final Configuration taskConfiguration;
- if (context.getId().endsWith(JVM_CONTEXT_SUFFIX)) {
- taskConfiguration = getJVMTaskConfiguration(taskId, command);
- } else {
- taskConfiguration = getCLRTaskConfiguration(taskId, command);
- }
- context.submitTask(taskConfiguration);
- } catch (final BindException ex) {
- LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Construct the final result and forward it to the Client.
- */
- private void returnResults() {
- final StringBuilder sb = new StringBuilder();
- for (final String result : this.results) {
- sb.append(result);
- }
- this.results.clear();
- LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
- this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(sb.toString()));
- }
-
- /**
- * Request evaluators on each node.
- * If nodes are not available yet, schedule another request in CHECK_UP_INTERVAL.
- * TODO: Ask for specific nodes. (This is not working in YARN... need to check again at some point.)
- *
- * @throws RuntimeException if any of the requests fails.
- */
- private synchronized void requestEvaluators() {
- assert (this.state == State.INIT);
- final int numNodes = totalEvaluators;
- if (numNodes > 0) {
- LOG.log(Level.INFO, "Schedule on {0} nodes.", numNodes);
- this.evaluatorRequestor.submit(
- EvaluatorRequest.newBuilder()
- .setMemory(128)
- .setNumberOfCores(1)
- .setNumber(numNodes).build()
- );
- this.state = State.WAIT_EVALUATORS;
- this.expectCount = numNodes;
- } else {
- // No nodes available yet - wait and ask again.
- this.clock.scheduleAlarm(CHECK_UP_INTERVAL, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm time) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "{0} Alarm: {1}", new Object[]{JobDriver.this.state, time});
- if (JobDriver.this.state == State.INIT) {
- JobDriver.this.requestEvaluators();
- }
- }
- }
- });
- }
- }
-
- /**
- * Possible states of the job driver. Can be one of:
- * <dl>
- * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
- * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
- * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
- * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
- * </dl>
- */
- private enum State {
- INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
- }
-
- /**
- * Handles AllocatedEvaluator: Submit an empty context
- */
- final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
- @Override
- public void onNext(final AllocatedEvaluator allocatedEvaluator) {
- synchronized (JobDriver.this) {
- if (JobDriver.this.nJVMEvaluator > 0) {
- LOG.log(Level.INFO, "===== adding JVM evaluator =====");
- JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.JVM);
- JobDriver.this.nJVMEvaluator -= 1;
- } else if (JobDriver.this.nCLREvaluator > 0) {
- LOG.log(Level.INFO, "===== adding CLR evaluator =====");
- JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
- JobDriver.this.nCLREvaluator -= 1;
- }
- }
- }
- }
-
- /**
- * Receive notification that a new Context is available.
- * Submit a new Distributed Shell Task to that Context.
- */
- final class ActiveContextHandler implements EventHandler<ActiveContext> {
- @Override
- public void onNext(final ActiveContext context) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
- new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- JobDriver.this.contexts.put(context.getId(), context);
- if (--JobDriver.this.expectCount <= 0) {
- JobDriver.this.state = State.READY;
- if (JobDriver.this.cmd == null) {
- LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
- JobDriver.this.state);
- } else {
- JobDriver.this.submit(JobDriver.this.cmd);
- }
- }
- }
- }
- }
-
- /**
- * Receive notification that the Context had completed.
- * Remove context from the list of active context.
- */
- final class ClosedContextHandler implements EventHandler<ClosedContext> {
- @Override
- public void onNext(final ClosedContext context) {
- LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
- }
- }
-
- /**
- * Receive notification that the Context had failed.
- * Remove context from the list of active context and notify the client.
- */
- final class FailedContextHandler implements EventHandler<FailedContext> {
- @Override
- public void onNext(final FailedContext context) {
- LOG.log(Level.SEVERE, "FailedContext", context);
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
- throw new RuntimeException("Failed context: ", context.asError());
- }
- }
-
- /**
- * Receive notification that the entire Evaluator had failed.
- * Stop other jobs and pass this error to the job observer on the client.
- */
- final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
- @Override
- public void onNext(final FailedEvaluator eval) {
- synchronized (JobDriver.this) {
- LOG.log(Level.SEVERE, "FailedEvaluator", eval);
- for (final FailedContext failedContext : eval.getFailedContextList()) {
- JobDriver.this.contexts.remove(failedContext.getId());
- }
- throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
- }
- }
- }
-
- /**
- * Receive notification that the Task has completed successfully.
- */
- final class CompletedTaskHandler implements EventHandler<CompletedTask> {
- @Override
- public void onNext(final CompletedTask task) {
- LOG.log(Level.INFO, "Completed task: {0}", task.getId());
- // Take the message returned by the task and add it to the running result.
- String result = "default result";
- try {
- if (task.getId().contains(CLR_CONTEXT_SUFFIX)) {
- result = new String(task.get());
- } else {
- result = JVM_CODEC.decode(task.get());
- }
- } catch (final Exception e) {
- LOG.log(Level.WARNING, "failed to decode task outcome");
- }
- synchronized (JobDriver.this) {
- JobDriver.this.results.add(task.getId() + " :: " + result);
- LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
- task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
- if (--JobDriver.this.expectCount <= 0) {
- JobDriver.this.returnResults();
- JobDriver.this.state = State.READY;
- if (JobDriver.this.cmd != null) {
- JobDriver.this.submit(JobDriver.this.cmd);
- }
- }
- }
- }
- }
-
- /**
- * Receive notification from the client.
- */
- final class ClientMessageHandler implements EventHandler<byte[]> {
- @Override
- public void onNext(final byte[] message) {
- synchronized (JobDriver.this) {
- final String command = JVM_CODEC.decode(message);
- LOG.log(Level.INFO, "Client message: {0} state: {1}",
- new Object[]{command, JobDriver.this.state});
- assert (JobDriver.this.cmd == null);
- if (JobDriver.this.state == State.READY) {
- JobDriver.this.submit(command);
- } else {
- // not ready yet - save the command for better times.
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- JobDriver.this.cmd = command;
- }
- }
- }
- }
-
- /**
- * Job Driver is ready and the clock is set up: request the evaluators.
- */
- final class StartHandler implements EventHandler<StartTime> {
- @Override
- public void onNext(final StartTime startTime) {
- LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
- assert (state == State.INIT);
- requestEvaluators();
- }
- }
-
- /**
- * Shutting down the job driver: close the evaluators.
- */
- final class StopHandler implements EventHandler<StopTime> {
- @Override
- public void onNext(final StopTime time) {
- LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
- for (final ActiveContext context : contexts.values()) {
- context.close();
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java
deleted file mode 100644
index 66dbe92..0000000
--- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.retained_evalCLR;
-
-import org.apache.reef.client.ClientConfiguration;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.AvroConfigurationSerializer;
-import org.apache.reef.tang.formats.CommandLine;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.OptionalParameter;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Retained Evaluators example - main class.
- */
-public final class Launch {
-
- /**
- * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently
- */
- private static final int MAX_NUMBER_OF_EVALUATORS = JobDriver.totalEvaluators;
- /**
- * Standard Java logger
- */
- private static final Logger LOG = Logger.getLogger(Launch.class.getName());
-
- /**
- * This class should not be instantiated.
- */
- private Launch() {
- throw new RuntimeException("Do not instantiate this class!");
- }
-
- /**
- * Parse the command line arguments.
- *
- * @param args command line arguments, as passed to main()
- * @return Configuration object.
- * @throws BindException configuration error.
- * @throws IOException error reading the configuration.
- */
- private static Configuration parseCommandLine(final String[] args)
- throws BindException, IOException {
- final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
- final CommandLine cl = new CommandLine(confBuilder);
- cl.registerShortNameOfClass(Local.class);
- cl.registerShortNameOfClass(Command.class);
- cl.registerShortNameOfClass(NumRuns.class);
- cl.processCommandLine(args);
- return confBuilder.build();
- }
-
- private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
- throws InjectionException, BindException {
- final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
- final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
- cb.bindNamedParameter(Command.class, injector.getNamedInstance(Command.class));
- cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class)));
- return cb.build();
- }
-
- /**
- * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
- *
- * @param args Command line arguments, as passed into main().
- * @return (immutable) TANG Configuration object.
- * @throws BindException if configuration commandLineInjector fails.
- * @throws InjectionException if configuration commandLineInjector fails.
- * @throws IOException error reading the configuration.
- */
- private static Configuration getClientConfiguration(final String[] args)
- throws BindException, InjectionException, IOException {
-
- final Configuration commandLineConf = parseCommandLine(args);
-
- final Configuration clientConfiguration = ClientConfiguration.CONF
- .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
- .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
- .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
- .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
- .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
- .build();
-
- // TODO: Remove the injector, have stuff injected.
- final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
- final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
- final Configuration runtimeConfiguration;
- if (isLocal) {
- LOG.log(Level.INFO, "Running on the local runtime");
- runtimeConfiguration = LocalRuntimeConfiguration.CONF
- .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
- .build();
- } else {
- LOG.log(Level.INFO, "Running on YARN");
- runtimeConfiguration = YarnClientConfiguration.CONF.build();
- }
-
- return Tang.Factory.getTang()
- .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
- cloneCommandLineConfiguration(commandLineConf))
- .build();
- }
-
- private static ConfigurationModule addAll(final ConfigurationModule conf, final OptionalParameter<String> param, final File folder) {
- ConfigurationModule result = conf;
- for (final File f : folder.listFiles()) {
- if (f.canRead() && f.exists() && f.isFile()) {
- result = result.set(param, f.getAbsolutePath());
- }
- }
- return result;
- }
-
- /**
- * Main method that starts the Retained Evaluators job.
- *
- * @param args command line parameters.
- */
- public static void main(final String[] args) {
- try {
- final File dotNetFolder = new File(args[0]).getAbsoluteFile();
- String[] removedArgs = Arrays.copyOfRange(args, 1, args.length);
-
- final Configuration config = getClientConfiguration(removedArgs);
- LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
- new AvroConfigurationSerializer().toString(config));
- final Injector injector = Tang.Factory.getTang().newInjector(config);
- final JobClient client = injector.getInstance(JobClient.class);
- client.submit(dotNetFolder);
- client.waitForCompletion();
- LOG.info("Done!");
- } catch (final BindException | InjectionException | IOException ex) {
- LOG.log(Level.SEVERE, "Job configuration error", ex);
- }
- }
-
- /**
- * Command line parameter: a command to run. e.g. "echo Hello REEF"
- */
- @NamedParameter(doc = "The shell command", short_name = "cmd", default_value = "*INTERACTIVE*")
- public static final class Command implements Name<String> {
- }
-
- /**
- * Command line parameter: number of experiments to run.
- */
- @NamedParameter(doc = "Number of times to run the command",
- short_name = "num_runs", default_value = "1")
- public static final class NumRuns implements Name<Integer> {
- }
-
- /**
- * Command line parameter = true to run locally, or false to run on YARN.
- */
- @NamedParameter(doc = "Whether or not to run on the local runtime",
- short_name = "local", default_value = "true")
- public static final class Local implements Name<Boolean> {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java
deleted file mode 100644
index 45455bb..0000000
--- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * The Retained Evaluators CLR example.
- */
-package org.apache.reef.examples.retained_evalCLR;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml
index 68c4693..4fd9aa1 100644
--- a/lang/java/reef-examples/pom.xml
+++ b/lang/java/reef-examples/pom.xml
@@ -216,65 +216,6 @@ under the License.
</profile>
<profile>
- <id>RetainedEval</id>
- <build>
- <defaultGoal>exec:exec</defaultGoal>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <configuration>
- <executable>java</executable>
- <arguments>
- <argument>-classpath</argument>
- <classpath/>
- <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
- </argument>
- <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory}
- </argument>
- <argument>org.apache.reef.examples.retained_eval.Launch</argument>
- <!-- <argument>-cmd</argument>
- <argument>date</argument>
- <argument>-num_runs</argument>
- <argument>20</argument>
- <argument>-local</argument>
- <argument>true</argument> -->
- </arguments>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>RetainedEval_yarn</id>
- <build>
- <defaultGoal>exec:exec</defaultGoal>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <configuration>
- <executable>java</executable>
- <arguments>
- <argument>-classpath</argument>
- <classpath/>
- <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
- </argument>
- <argument>org.apache.reef.examples.retained_eval.Launch</argument>
- <argument>-cmd</argument>
- <argument>date</argument>
- <argument>-num_runs</argument>
- <argument>20</argument>
- <argument>-local</argument>
- <argument>false</argument>
- </arguments>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
<id>SuspendDemo</id>
<build>
<defaultGoal>exec:exec</defaultGoal>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
deleted file mode 100644
index 61d549d..0000000
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.retained_eval;
-
-import org.apache.reef.client.*;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.JavaConfigurationBuilder;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-
-import javax.inject.Inject;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Retained Evaluator Shell Client.
- */
-@Unit
-public class JobClient {
-
- /**
- * Standard java logger.
- */
- private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
-
- /**
- * Codec to translate messages to and from the job driver
- */
- private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
-
- /**
- * Reference to the REEF framework.
- * This variable is injected automatically in the constructor.
- */
- private final REEF reef;
-
- /**
- * Shell command to submitTask to the job driver.
- */
- private final String command;
-
- /**
- * Job Driver configuration.
- */
- private final Configuration driverConfiguration;
-
- /**
- * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
- */
- private final boolean isInteractive;
-
- /**
- * Total number of experiments to run.
- */
- private final int maxRuns;
-
- /**
- * Command prompt reader for the interactive mode (stdin).
- */
- private final BufferedReader prompt;
-
- /**
- * A reference to the running job that allows client to send messages back to the job driver
- */
- private RunningJob runningJob;
-
- /**
- * Start timestamp of the current task.
- */
- private long startTime = 0;
-
- /**
- * Total time spent performing tasks in Evaluators.
- */
- private long totalTime = 0;
-
- /**
- * Number of experiments ran so far.
- */
- private int numRuns = 0;
-
- /**
- * Set to false when job driver is done.
- */
- private boolean isBusy = true;
-
- /**
- * Last result returned from the job driver.
- */
- private String lastResult;
-
- /**
- * Retained Evaluator client.
- * Parameters are injected automatically by TANG.
- *
- * @param command Shell command to run on each Evaluator.
- * @param reef Reference to the REEF framework.
- */
- @Inject
- JobClient(final REEF reef,
- @Parameter(Command.class) final String command,
- @Parameter(Launch.NumRuns.class) final Integer numRuns,
- @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException {
-
- this.reef = reef;
- this.command = command;
- this.maxRuns = numRuns;
-
- // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
- this.isInteractive = this.command ==
- Command.class.getAnnotation(NamedParameter.class).default_value();
-
- this.prompt = this.isInteractive ?
- new BufferedReader(new InputStreamReader(System.in)) : null;
-
- final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder();
- configBuilder.addConfiguration(
- DriverConfiguration.CONF
- .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
- .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
- .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
- .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
- .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
- .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
- .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
- .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
- .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
- .build()
- );
- configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators);
- this.driverConfiguration = configBuilder.build();
- }
-
- /**
- * @return a Configuration binding the ClientConfiguration.* event handlers to this Client.
- */
- public static Configuration getClientConfiguration() {
- return ClientConfiguration.CONF
- .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
- .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
- .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
- .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
- .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
- .build();
- }
-
- /**
- * Launch the job driver.
- *
- * @throws BindException configuration error.
- */
- public void submit() {
- this.reef.submit(this.driverConfiguration);
- }
-
- /**
- * Send command to the job driver. Record timestamp when the command was sent.
- * If this.command is set, use it; otherwise, ask user for the command.
- */
- private synchronized void submitTask() {
- if (this.isInteractive) {
- String cmd;
- try {
- do {
- System.out.print("\nRE> ");
- cmd = this.prompt.readLine();
- } while (cmd != null && cmd.trim().isEmpty());
- } catch (final IOException ex) {
- LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
- cmd = null;
- }
- if (cmd == null || cmd.equals("exit")) {
- this.runningJob.close();
- stopAndNotify();
- } else {
- this.submitTask(cmd);
- }
- } else {
- // non-interactive batch mode:
- this.submitTask(this.command);
- }
- }
-
- /**
- * Send command to the job driver. Record timestamp when the command was sent.
- *
- * @param cmd shell command to execute in all Evaluators.
- */
- private synchronized void submitTask(final String cmd) {
- LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}",
- new Object[]{this.numRuns + 1, cmd, this.runningJob});
- this.startTime = System.currentTimeMillis();
- this.runningJob.send(CODEC.encode(cmd));
- }
-
- /**
- * Notify the process in waitForCompletion() method that the main process has finished.
- */
- private synchronized void stopAndNotify() {
- this.runningJob = null;
- this.isBusy = false;
- this.notify();
- }
-
- /**
- * Wait for the job driver to complete. This method is called from Launch.main()
- */
- public String waitForCompletion() {
- while (this.isBusy) {
- LOG.log(Level.FINE, "Waiting for the Job Driver to complete.");
- try {
- synchronized (this) {
- this.wait();
- }
- } catch (final InterruptedException ex) {
- LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
- }
- }
- return this.lastResult;
- }
-
- public void close() {
- this.reef.close();
- }
-
- /**
- * Receive notification from the job driver that the job is running.
- */
- final class RunningJobHandler implements EventHandler<RunningJob> {
- @Override
- public void onNext(final RunningJob job) {
- LOG.log(Level.FINE, "Running job: {0}", job.getId());
- synchronized (JobClient.this) {
- JobClient.this.runningJob = job;
- JobClient.this.submitTask();
- }
- }
- }
-
- /**
- * Receive message from the job driver.
- * There is only one message, which comes at the end of the driver execution
- * and contains shell command output on each node.
- */
- final class JobMessageHandler implements EventHandler<JobMessage> {
- @Override
- public void onNext(final JobMessage message) {
- synchronized (JobClient.this) {
-
- lastResult = CODEC.decode(message.get());
- final long jobTime = System.currentTimeMillis() - startTime;
- totalTime += jobTime;
- ++numRuns;
-
- LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}",
- new Object[]{"" + numRuns, "" + jobTime, lastResult});
-
- System.out.println(lastResult);
-
- if (runningJob != null) {
- if (isInteractive || numRuns < maxRuns) {
- submitTask();
- } else {
- LOG.log(Level.INFO,
- "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
- new Object[]{maxRuns, totalTime / (double) maxRuns});
- runningJob.close();
- stopAndNotify();
- }
- }
- }
- }
- }
-
- /**
- * Receive notification from the job driver that the job had failed.
- */
- final class FailedJobHandler implements EventHandler<FailedJob> {
- @Override
- public void onNext(final FailedJob job) {
- LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
- stopAndNotify();
- }
- }
-
- /**
- * Receive notification from the job driver that the job had completed successfully.
- */
- final class CompletedJobHandler implements EventHandler<CompletedJob> {
- @Override
- public void onNext(final CompletedJob job) {
- LOG.log(Level.FINE, "Completed job: {0}", job.getId());
- stopAndNotify();
- }
- }
-
- /**
- * Receive notification that there was an exception thrown from the job driver.
- */
- final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
- @Override
- public void onNext(final FailedRuntime error) {
- LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
- stopAndNotify();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
deleted file mode 100644
index b2b2055..0000000
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.retained_eval;
-
-import org.apache.reef.driver.client.JobMessageObserver;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.TaskConfiguration;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.examples.library.ShellTask;
-import org.apache.reef.tang.JavaConfigurationBuilder;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Retained Evaluator example job driver. Execute shell command on all evaluators,
- * capture stdout, and return concatenated results back to the client.
- */
-@Unit
-public final class JobDriver {
- /**
- * Standard Java logger.
- */
- private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
-
- /**
- * Duration of one clock interval.
- */
- private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
-
- /**
- * String codec is used to encode the results
- * before passing them back to the client.
- */
- private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
- /**
- * Job observer on the client.
- * We use it to send results from the driver back to the client.
- */
- private final JobMessageObserver jobMessageObserver;
- /**
- * Job driver uses EvaluatorRequestor
- * to request Evaluators that will run the Tasks.
- */
- private final EvaluatorRequestor evaluatorRequestor;
- /**
- * Number of Evalutors to request (default is 1).
- */
- private final int numEvaluators;
- /**
- * Shell execution results from each Evaluator.
- */
- private final List<String> results = new ArrayList<>();
- /**
- * Map from context ID to running evaluator context.
- */
- private final Map<String, ActiveContext> contexts = new HashMap<>();
- /**
- * Job driver state.
- */
- private State state = State.INIT;
- /**
- * First command to execute. Sometimes client can send us the first command
- * before Evaluators are available; we need to store this command here.
- */
- private String cmd;
- /**
- * Number of evaluators/tasks to complete.
- */
- private int expectCount = 0;
-
- /**
- * Job driver constructor.
- * All parameters are injected from TANG automatically.
- *
- * @param jobMessageObserver is used to send messages back to the client.
- * @param evaluatorRequestor is used to request Evaluators.
- */
- @Inject
- JobDriver(final JobMessageObserver jobMessageObserver,
- final EvaluatorRequestor evaluatorRequestor,
- final @Parameter(Launch.NumEval.class) Integer numEvaluators) {
- this.jobMessageObserver = jobMessageObserver;
- this.evaluatorRequestor = evaluatorRequestor;
- this.numEvaluators = numEvaluators;
- }
-
- /**
- * Construct the final result and forward it to the Client.
- */
- private void returnResults() {
- final StringBuilder sb = new StringBuilder();
- for (final String result : this.results) {
- sb.append(result);
- }
- this.results.clear();
- LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
- this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString()));
- }
-
- /**
- * Submit command to all available evaluators.
- *
- * @param command shell command to execute.
- */
- private void submit(final String command) {
- LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
- new Object[]{command, this.contexts.size(), this.state});
- assert (this.state == State.READY);
- this.expectCount = this.contexts.size();
- this.state = State.WAIT_TASKS;
- this.cmd = null;
- for (final ActiveContext context : this.contexts.values()) {
- this.submit(context, command);
- }
- }
-
- /**
- * Submit a Task that execute the command to a single Evaluator.
- * This method is called from <code>submitTask(cmd)</code>.
- */
- private void submit(final ActiveContext context, final String command) {
- try {
- LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
- final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
- cb.addConfiguration(
- TaskConfiguration.CONF
- .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
- .set(TaskConfiguration.TASK, ShellTask.class)
- .build()
- );
- cb.bindNamedParameter(Command.class, command);
- context.submitTask(cb.build());
- } catch (final BindException ex) {
- LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
- context.close();
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Request the evaluators.
- */
- private synchronized void requestEvaluators() {
- assert (this.state == State.INIT);
- LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
- this.evaluatorRequestor.submit(
- EvaluatorRequest.newBuilder()
- .setMemory(128)
- .setNumberOfCores(1)
- .setNumber(this.numEvaluators).build()
- );
- this.state = State.WAIT_EVALUATORS;
- this.expectCount = this.numEvaluators;
- }
-
- /**
- * Possible states of the job driver. Can be one of:
- * <dl>
- * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
- * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
- * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
- * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
- * </dl>
- */
- private enum State {
- INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
- }
-
- /**
- * Receive notification that an Evaluator had been allocated,
- * and submitTask a new Task in that Evaluator.
- */
- final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
- @Override
- public void onNext(final AllocatedEvaluator eval) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
- new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- try {
- eval.submitContext(ContextConfiguration.CONF.set(
- ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
- } catch (final BindException ex) {
- LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
- throw new RuntimeException(ex);
- }
- }
- }
- }
-
- /**
- * Receive notification that the entire Evaluator had failed.
- * Stop other jobs and pass this error to the job observer on the client.
- */
- final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
- @Override
- public void onNext(final FailedEvaluator eval) {
- synchronized (JobDriver.this) {
- LOG.log(Level.SEVERE, "FailedEvaluator", eval);
- for (final FailedContext failedContext : eval.getFailedContextList()) {
- JobDriver.this.contexts.remove(failedContext.getId());
- }
- throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
- }
- }
- }
-
- /**
- * Receive notification that a new Context is available.
- * Submit a new Distributed Shell Task to that Context.
- */
- final class ActiveContextHandler implements EventHandler<ActiveContext> {
- @Override
- public void onNext(final ActiveContext context) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
- new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- JobDriver.this.contexts.put(context.getId(), context);
- if (--JobDriver.this.expectCount <= 0) {
- JobDriver.this.state = State.READY;
- if (JobDriver.this.cmd == null) {
- LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
- JobDriver.this.state);
- } else {
- JobDriver.this.submit(JobDriver.this.cmd);
- }
- }
- }
- }
- }
-
- /**
- * Receive notification that the Context had completed.
- * Remove context from the list of active context.
- */
- final class ClosedContextHandler implements EventHandler<ClosedContext> {
- @Override
- public void onNext(final ClosedContext context) {
- LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
- }
- }
-
- /**
- * Receive notification that the Context had failed.
- * Remove context from the list of active context and notify the client.
- */
- final class FailedContextHandler implements EventHandler<FailedContext> {
- @Override
- public void onNext(final FailedContext context) {
- LOG.log(Level.SEVERE, "FailedContext", context);
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
- throw new RuntimeException("Failed context: ", context.asError());
- }
- }
-
- /**
- * Receive notification that the Task has completed successfully.
- */
- final class CompletedTaskHandler implements EventHandler<CompletedTask> {
- @Override
- public void onNext(final CompletedTask task) {
- LOG.log(Level.INFO, "Completed task: {0}", task.getId());
- // Take the message returned by the task and add it to the running result.
- final String result = CODEC.decode(task.get());
- synchronized (JobDriver.this) {
- JobDriver.this.results.add(task.getId() + " :: " + result);
- LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
- task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
- if (--JobDriver.this.expectCount <= 0) {
- JobDriver.this.returnResults();
- JobDriver.this.state = State.READY;
- if (JobDriver.this.cmd != null) {
- JobDriver.this.submit(JobDriver.this.cmd);
- }
- }
- }
- }
- }
-
- /**
- * Receive notification from the client.
- */
- final class ClientMessageHandler implements EventHandler<byte[]> {
- @Override
- public void onNext(final byte[] message) {
- synchronized (JobDriver.this) {
- final String command = CODEC.decode(message);
- LOG.log(Level.INFO, "Client message: {0} state: {1}",
- new Object[]{command, JobDriver.this.state});
- assert (JobDriver.this.cmd == null);
- if (JobDriver.this.state == State.READY) {
- JobDriver.this.submit(command);
- } else {
- // not ready yet - save the command for better times.
- assert (JobDriver.this.state == State.WAIT_EVALUATORS);
- JobDriver.this.cmd = command;
- }
- }
- }
- }
-
- /**
- * Job Driver is ready and the clock is set up: request the evaluators.
- */
- final class StartHandler implements EventHandler<StartTime> {
- @Override
- public void onNext(final StartTime startTime) {
- LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
- assert (state == State.INIT);
- requestEvaluators();
- }
- }
-
- /**
- * Shutting down the job driver: close the evaluators.
- */
- final class StopHandler implements EventHandler<StopTime> {
- @Override
- public void onNext(final StopTime time) {
- LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
- for (final ActiveContext context : contexts.values()) {
- context.close();
- }
- }
- }
-}