You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/30 02:13:48 UTC
incubator-reef git commit: [REEF-514] Move Driver restart
configuration to a separate configuration
Repository: incubator-reef
Updated Branches:
refs/heads/master 88c63e456 -> aee7bebf5
[REEF-514] Move Driver restart configuration to a separate configuration
This addressed the issue by
* Moving configurations for restarts to a separate
ConfigurationModule.
* Refactoring restart handlers to restart specific configuration
modules that are runtime-dependent.
JIRA:
[REEF-514](https://issues.apache.org/jira/browse/REEF-514)
Pull Request:
This closes #319
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/aee7bebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/aee7bebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/aee7bebf
Branch: refs/heads/master
Commit: aee7bebf547012f73cd9af538f5dec6df6560ed0
Parents: 88c63e4
Author: Andrew Chung <af...@gmail.com>
Authored: Tue Jul 21 15:46:55 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Jul 29 16:36:03 2015 -0700
----------------------------------------------------------------------
.../apache/reef/bridge/client/Constants.java | 17 ++--
.../bridge/client/YarnJobSubmissionClient.java | 38 +++++++--
.../reef/javabridge/generic/JobClient.java | 34 +++++---
.../apache/reef/javabridge/generic/Launch.java | 5 +-
.../apache/reef/client/DriverConfiguration.java | 27 -------
.../reef/client/DriverRestartConfiguration.java | 75 +++++++++++++++++
.../reef/client/DriverServiceConfiguration.java | 14 +---
.../driver/parameters/DriverRestartHandler.java | 4 +-
.../ServiceDriverRestartedHandlers.java | 35 ++++++++
.../DriverRuntimeRestartConfiguration.java | 38 +++++++++
.../common/driver/DriverStartHandler.java | 16 ++--
.../reef/examples/hello/HelloDriverRestart.java | 45 +++++++++++
.../reef/examples/hello/HelloREEFYarn.java | 4 +-
.../examples/hello/HelloREEFYarnRestart.java | 85 ++++++++++++++++++++
.../driver/YarnDriverRestartConfiguration.java | 43 ++++++++++
.../reef/webserver/ReefEventStateManager.java | 13 ++-
16 files changed, 416 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
index 7bd9c1d..e5b397c 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
@@ -20,6 +20,7 @@ package org.apache.reef.bridge.client;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverServiceConfiguration;
+import org.apache.reef.client.DriverRestartConfiguration;
import org.apache.reef.io.network.naming.NameServerConfiguration;
import org.apache.reef.javabridge.generic.JobDriver;
import org.apache.reef.tang.Configuration;
@@ -40,18 +41,14 @@ public final class Constants {
.set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
.set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, JobDriver.DriverRestartActiveContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_MESSAGE, JobDriver.ContextMessageHandler.class)
.set(DriverConfiguration.ON_TASK_MESSAGE, JobDriver.TaskMessageHandler.class)
.set(DriverConfiguration.ON_TASK_FAILED, JobDriver.FailedTaskHandler.class)
.set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, JobDriver.DriverRestartRunningTaskHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_COMPLETED, JobDriver.DriverRestartCompletedHandler.class)
.set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
.set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
.set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class)
.build();
@@ -67,13 +64,17 @@ public final class Constants {
.set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED,
ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
.set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
- .set(DriverServiceConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
- ReefEventStateManager.DrivrRestartActiveContextStateHandler.class)
.set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
- .set(DriverServiceConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
- ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
.set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
.set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
+ .build(),
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
+ ReefEventStateManager.DriverRestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ ReefEventStateManager.DriverRestartActiveContextStateHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
.build()
);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index bb47834..e707549 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.client.DriverRestartConfiguration;
import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.javabridge.generic.JobDriver;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -31,6 +33,7 @@ import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
import org.apache.reef.tang.*;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
@@ -72,17 +75,36 @@ public final class YarnJobSubmissionClient {
this.classpath = classpath;
}
- private void addJVMConfiguration(final File driverFolder, final String jobId, final String jobSubmissionFolder)
+ private void addYarnDriverConfiguration(final File driverFolder, final String jobId, final String jobSubmissionFolder)
throws IOException {
final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+ final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .build();
+
+ final Configuration yarnDriverRestartConfiguration =
+ YarnDriverRestartConfiguration.CONF
+ .build();
+
+ final Configuration driverRestartConfiguration =
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+ JobDriver.DriverRestartCompletedHandler.class)
+ .build();
+
final Configuration driverConfiguration = Configurations.merge(
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
- YarnDriverConfiguration.CONF
- .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
- .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
- .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
- .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
- .build());
+ yarnDriverConfiguration,
+ yarnDriverRestartConfiguration,
+ driverRestartConfiguration);
this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
}
@@ -132,7 +154,7 @@ public final class YarnJobSubmissionClient {
// ------------------------------------------------------------------------
// Prepare the JAR
final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
- this.addJVMConfiguration(driverFolder, jobId, jobFolderOnDFS.getPath().toString());
+ this.addYarnDriverConfiguration(driverFolder, jobId, jobFolderOnDFS.getPath().toString());
final File jarFile = makeJar(driverFolder);
LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
index ba0d46a..057a4f9 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
@@ -19,8 +19,10 @@
package org.apache.reef.javabridge.generic;
import org.apache.reef.client.*;
+import org.apache.reef.client.DriverRestartConfiguration;
import org.apache.reef.io.network.naming.NameServerConfiguration;
import org.apache.reef.javabridge.NativeInterop;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Unit;
@@ -105,18 +107,14 @@ public class JobClient {
.set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
.set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, JobDriver.DriverRestartActiveContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_MESSAGE, JobDriver.ContextMessageHandler.class)
.set(DriverConfiguration.ON_TASK_MESSAGE, JobDriver.TaskMessageHandler.class)
.set(DriverConfiguration.ON_TASK_FAILED, JobDriver.FailedTaskHandler.class)
.set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, JobDriver.DriverRestartRunningTaskHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTART_COMPLETED, JobDriver.DriverRestartCompletedHandler.class)
.set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
.set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
- .set(DriverConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
.set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class);
}
@@ -139,17 +137,30 @@ public class JobClient {
.set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED,
ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
.set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
- .set(DriverServiceConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
- ReefEventStateManager.DrivrRestartActiveContextStateHandler.class)
.set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
- .set(DriverServiceConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
- ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
.set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
.set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
.build();
+
return Configurations.merge(httpHandlerConfiguration, driverConfigurationForHttpServer);
}
+ public static Configuration getYarnConfiguration() {
+ final Configuration yarnDriverRestartConfiguration = YarnDriverRestartConfiguration.CONF
+ .build();
+
+ final Configuration driverRestartHandlerConfigurations = DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
+ ReefEventStateManager.DriverRestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ ReefEventStateManager.DriverRestartActiveContextStateHandler.class)
+ .build();
+
+ return Configurations.merge(yarnDriverRestartConfiguration, driverRestartHandlerConfigurations);
+ }
+
public void addCLRFiles(final File folder) throws BindException {
try (final LoggingScope ls = this.loggingScopeFactory.getNewLoggingScope("JobClient::addCLRFiles")) {
ConfigurationModule result = this.driverConfigModule;
@@ -194,8 +205,13 @@ public class JobClient {
*
* @throws org.apache.reef.tang.exceptions.BindException configuration error.
*/
- public void submit(final File clrFolder, final boolean submitDriver, final Configuration clientConfig) {
+ public void submit(final File clrFolder, final boolean submitDriver,
+ final boolean local, final Configuration clientConfig) {
try (final LoggingScope ls = this.loggingScopeFactory.driverSubmit(submitDriver)) {
+ if (!local) {
+ this.driverConfiguration = Configurations.merge(this.driverConfiguration, this.getYarnConfiguration());
+ }
+
try {
addCLRFiles(clrFolder);
} catch (final BindException e) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
index 4392758..e67176d 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
@@ -153,6 +153,7 @@ public final class Launch {
final Injector commandLineInjector = Tang.Factory.getTang().newInjector(parseCommandLine(removedArgs));
final int waitTime = commandLineInjector.getNamedInstance(WaitTimeForDriver.class);
final int driverMemory = commandLineInjector.getNamedInstance(DriverMemoryInMb.class);
+ final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
final String driverIdentifier = commandLineInjector.getNamedInstance(DriverIdentifier.class);
final String jobSubmissionDirectory = commandLineInjector.getNamedInstance(DriverJobSubmissionDirectory.class);
final boolean submit = commandLineInjector.getNamedInstance(Submit.class);
@@ -161,10 +162,10 @@ public final class Launch {
client.setDriverInfo(driverIdentifier, driverMemory, jobSubmissionDirectory);
if (submit) {
- client.submit(dotNetFolder, true, null);
+ client.submit(dotNetFolder, true, isLocal, null);
client.waitForCompletion(waitTime);
} else {
- client.submit(dotNetFolder, false, config);
+ client.submit(dotNetFolder, false, isLocal, config);
client.waitForCompletion(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
index 40bdbd3..a1ae9c4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -30,7 +30,6 @@ import org.apache.reef.driver.evaluator.CompletedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.task.*;
-import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration;
import org.apache.reef.tang.formats.*;
import org.apache.reef.wake.EventHandler;
@@ -89,11 +88,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
/**
- * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
- */
- public static final OptionalImpl<EventHandler<StartTime>> ON_DRIVER_RESTARTED = new OptionalImpl<>();
-
- /**
* The event handler invoked right before the driver shuts down. Defaults to ignore.
*/
public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
@@ -138,11 +132,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
/**
- * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to crash if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
-
- /**
* Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
* task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
*/
@@ -173,11 +162,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
/**
- * Event handler for active context when driver restart. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
* Event handler for closed context. Defaults to logging if not bound.
*/
public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
@@ -198,16 +182,9 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS = new OptionalParameter<>();
/**
- * Event handler for the event of driver restart completion, default to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<DriverRestartCompleted>> ON_DRIVER_RESTART_COMPLETED =
- new OptionalImpl<>();
-
- /**
* ConfigurationModule to fill out to get a legal Driver Configuration.
*/
public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
-
.bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
.bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
.bindNamedParameter(DriverJobSubmissionDirectory.class, DRIVER_JOB_SUBMISSION_DIRECTORY)
@@ -218,7 +195,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
// Driver start/stop handlers
.bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
- .bindNamedParameter(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
.bindSetEntry(Clock.StartHandler.class, org.apache.reef.runtime.common.driver.DriverStartHandler.class)
.bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
@@ -229,7 +205,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
// Task handlers
.bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
.bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
.bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
.bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
@@ -237,7 +212,6 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
// Context handlers
.bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
.bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
.bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
.bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
@@ -249,6 +223,5 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
// Various parameters
.bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
- .bindSetEntry(DriverRestartCompletedHandlers.class, ON_DRIVER_RESTART_COMPLETED)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
new file mode 100644
index 0000000..9973fad
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
+import org.apache.reef.driver.parameters.DriverRestartContextActiveHandlers;
+import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.driver.parameters.DriverRestartTaskRunningHandlers;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+/**
+ * EventHandlers specific to Driver Restart. Please remember to bind a runtime-specific DriverRestartConfiguration,
+ * e.g. YarnDriverRestartConfiguration.
+ */
+@Public
+@ClientSide
+@Provided
+public final class DriverRestartConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
+ */
+ public static final OptionalImpl<EventHandler<StartTime>> ON_DRIVER_RESTARTED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to crash if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for active context when driver restart. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for the event of driver restart completion, default to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<DriverRestartCompleted>> ON_DRIVER_RESTART_COMPLETED =
+ new OptionalImpl<>();
+
+ public static final ConfigurationModule CONF = new DriverRestartConfiguration()
+ .bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
+ .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
+ .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
+ .bindSetEntry(DriverRestartCompletedHandlers.class, ON_DRIVER_RESTART_COMPLETED)
+ .build();
+
+ private DriverRestartConfiguration(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
index 7faeea0..d320f2f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -119,11 +119,6 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder
public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
/**
- * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
-
- /**
* Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
* task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
*/
@@ -138,11 +133,6 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder
public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
/**
- * Event handler for active context when driver restart. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
* Event handler for closed context. Defaults to logging if not bound.
*/
public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
@@ -179,7 +169,6 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder
// Task handlers
.bindSetEntry(ServiceTaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
.bindSetEntry(ServiceTaskFailedHandlers.class, ON_TASK_FAILED)
.bindSetEntry(ServiceTaskMessageHandlers.class, ON_TASK_MESSAGE)
.bindSetEntry(ServiceTaskCompletedHandlers.class, ON_TASK_COMPLETED)
@@ -187,10 +176,9 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder
// Context handlers
.bindSetEntry(ServiceContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
.bindSetEntry(ServiceContextClosedHandlers.class, ON_CONTEXT_CLOSED)
.bindSetEntry(ServiceContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
.bindSetEntry(ServiceContextFailedHandlers.class, ON_CONTEXT_FAILED)
.build();
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
index a3021ee..11b8bb3 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
@@ -23,10 +23,12 @@ import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
+import java.util.Set;
+
/**
* The StartTime event is routed to this EventHandler if there is a restart, instead of to DriverStartHandler.
*/
@NamedParameter(doc = "The StartTime event is routed to this EventHandler if there is a restart, " +
"instead of to DriverStartHandler.")
-public final class DriverRestartHandler implements Name<EventHandler<StartTime>> {
+public final class DriverRestartHandler implements Name<Set<EventHandler<StartTime>>> {
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
new file mode 100644
index 0000000..41d7f95
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Set;
+
+/**
+ * Service Handler for driver restarts.
+ */
+@NamedParameter(doc = "Service Handler for driver restarts.")
+public final class ServiceDriverRestartedHandlers implements Name<Set<EventHandler<StartTime>>> {
+ private ServiceDriverRestartedHandlers(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
new file mode 100644
index 0000000..1f0a527
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.formats.*;
+
+/**
+ * The base configuration module for driver restart configurations of all runtimes.
+ * <p/>
+ */
+@Private
+@ClientSide
+public final class DriverRuntimeRestartConfiguration extends ConfigurationModuleBuilder {
+
+ private DriverRuntimeRestartConfiguration() {
+ }
+
+ // TODO: bind service handlers in REEF-483
+ public static final ConfigurationModule CONF = new DriverRuntimeRestartConfiguration().build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 92d78ca..19c2e7a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -37,19 +37,19 @@ public final class DriverStartHandler implements EventHandler<StartTime> {
private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
- private final Optional<EventHandler<StartTime>> restartHandler;
+ private final Optional<Set<EventHandler<StartTime>>> restartHandlers;
private final DriverStatusManager driverStatusManager;
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
final Set<EventHandler<StartTime>> startHandler,
- @Parameter(DriverRestartHandler.class) final EventHandler<StartTime> restartHandler,
+ @Parameter(DriverRestartHandler.class) final Set<EventHandler<StartTime>> restartHandlers,
final DriverStatusManager driverStatusManager) {
this.startHandlers = startHandler;
- this.restartHandler = Optional.of(restartHandler);
+ this.restartHandlers = Optional.of(restartHandlers);
this.driverStatusManager = driverStatusManager;
LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and RestartHandler [{1}]",
- new String[]{this.startHandlers.toString(), this.restartHandler.toString()});
+ new String[]{this.startHandlers.toString(), this.restartHandlers.toString()});
}
@Inject
@@ -57,7 +57,7 @@ public final class DriverStartHandler implements EventHandler<StartTime> {
final Set<EventHandler<StartTime>> startHandler,
final DriverStatusManager driverStatusManager) {
this.startHandlers = startHandler;
- this.restartHandler = Optional.empty();
+ this.restartHandlers = Optional.empty();
this.driverStatusManager = driverStatusManager;
LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and no RestartHandler",
this.startHandlers.toString());
@@ -73,8 +73,10 @@ public final class DriverStartHandler implements EventHandler<StartTime> {
}
private void onRestart(final StartTime startTime) {
- if (restartHandler.isPresent()) {
- this.restartHandler.get().onNext(startTime);
+ if (restartHandlers.isPresent()) {
+ for (EventHandler<StartTime> restartHandler : this.restartHandlers.get()) {
+ restartHandler.onNext(startTime);
+ }
} else {
throw new DriverFatalRuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
new file mode 100644
index 0000000..90858d8
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Restart code for the Hello REEF Application.
+ */
+@Unit
+public final class HelloDriverRestart {
+
+ private static final Logger LOG = Logger.getLogger(HelloDriverRestart.class.getName());
+
+ /**
+ * Handles Restarts. Prints a message.
+ */
+ public final class DriverRestartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime value) {
+ LOG.log(Level.INFO, "Hello, driver restarted at " + value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarn.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarn.java
index 78df28d..779b432 100644
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarn.java
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarn.java
@@ -37,8 +37,10 @@ public final class HelloREEFYarn {
/**
* Number of milliseconds to wait for the job to complete.
+ * Setting to 100 sec because running on RM HA clusters take around
+ * 50 seconds to set the job to running.
*/
- private static final int JOB_TIMEOUT = 30000; // 30 sec.
+ private static final int JOB_TIMEOUT = 100000; // 100 sec.
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarnRestart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarnRestart.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarnRestart.java
new file mode 100644
index 0000000..fe6e9e5
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFYarnRestart.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class HelloREEFYarnRestart {
+
+ private static final Logger LOG = Logger.getLogger(HelloREEFYarnRestart.class.getName());
+
+ /**
+ * Number of milliseconds to wait for the job to complete.
+ * Setting to 100 sec because running on RM HA clusters take around
+ * 50 seconds to set the job to running.
+ */
+ private static final int JOB_TIMEOUT = 100000; // 100 sec.
+
+
+ /**
+ * @return the configuration of the HelloREEF driver.
+ */
+ private static Configuration getDriverConfiguration() {
+ return
+ Configurations.merge(DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES,
+ HelloREEFYarnRestart.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build(),
+ YarnDriverRestartConfiguration.CONF
+ .build(),
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
+ HelloDriverRestart.DriverRestartHandler.class)
+ .build());
+ }
+
+ /**
+ * Start Hello REEF job. Runs method runHelloReefYarnRestart().
+ *
+ * @param args command line parameters.
+ * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+ * @throws org.apache.reef.tang.exceptions.InjectionException configuration error.
+ */
+ public static void main(final String[] args) throws InjectionException {
+
+ final LauncherStatus status = DriverLauncher
+ .getLauncher(YarnClientConfiguration.CONF.build())
+ .run(getDriverConfiguration(), JOB_TIMEOUT);
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private HelloREEFYarnRestart() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
new file mode 100644
index 0000000..7c11fec
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.common.driver.DriverRuntimeRestartConfiguration;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+
+/**
+ * Use this ConfigurationModule to configure YARN-specific Restart options for the driver.
+ * <p/>
+ */
+@ClientSide
+@Public
+@Provided
+public final class YarnDriverRestartConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
+ */
+ // TODO: Bind runtime-specific restart logic for REEF-483.
+ public static final ConfigurationModule CONF = new YarnDriverRestartConfiguration()
+ .merge(DriverRuntimeRestartConfiguration.CONF)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aee7bebf/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
index 5ea15a0..60468f8 100644
--- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
+++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
@@ -237,6 +237,17 @@ public final class ReefEventStateManager {
}
/**
+ * Job Driver has been restarted.
+ */
+ public final class DriverRestartHandler implements EventHandler<StartTime> {
+ @Override
+ @SuppressWarnings("checkstyle:hiddenfield")
+ public void onNext(final StartTime restartTime) {
+ LOG.log(Level.INFO, "DriverRestartHandler called. StartTime: {0}", restartTime);
+ }
+ }
+
+ /**
* Job driver stopped, log the stop time.
*/
public final class StopStateHandler implements EventHandler<StopTime> {
@@ -296,7 +307,7 @@ public final class ReefEventStateManager {
/**
* Receive notification that a new Context is available.
*/
- public final class DrivrRestartActiveContextStateHandler implements EventHandler<ActiveContext> {
+ public final class DriverRestartActiveContextStateHandler implements EventHandler<ActiveContext> {
@Override
public void onNext(final ActiveContext context) {
synchronized (ReefEventStateManager.this) {