You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/10/27 23:05:53 UTC

incubator-reef git commit: [REEF-866] Refactor bridge clients to separate driver config generation from application submission

Repository: incubator-reef
Updated Branches:
  refs/heads/master db0462047 -> ebbbeacc8


[REEF-866] Refactor bridge clients to separate driver config generation from application submission

JIRA:
  [REEF-866](https://issues.apache.org/jira/browse/REEF-866)

Pull Request:
  Closes #585


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ebbbeacc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ebbbeacc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ebbbeacc

Branch: refs/heads/master
Commit: ebbbeacc862e205c3ee05c41f22d061d0b9cd57e
Parents: db04620
Author: Anupam <an...@gmail.com>
Authored: Thu Oct 22 19:19:40 2015 -0700
Committer: Andrew Chung <af...@gmail.com>
Committed: Tue Oct 27 15:03:25 2015 -0700

----------------------------------------------------------------------
 .../apache/reef/bridge/client/LocalClient.java  |  47 +-----
 ...ocalRuntimeDriverConfigurationGenerator.java | 104 ++++++++++++
 .../YarnDriverConfigurationGenerator.java       | 160 +++++++++++++++++++
 .../bridge/client/YarnJobSubmissionClient.java  | 112 ++-----------
 .../bridge/client/YarnSubmissionFromCS.java     |   2 +-
 5 files changed, 290 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index 7a08ee2..e80164b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -18,21 +18,15 @@
  */
 package org.apache.reef.bridge.client;
 
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.AvroConfigurationSerializer;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -43,24 +37,14 @@ public final class LocalClient {
 
   private static final Logger LOG = Logger.getLogger(LocalClient.class.getName());
   private static final String CLIENT_REMOTE_ID = ClientRemoteIdentifier.NONE;
-  private final AvroConfigurationSerializer configurationSerializer;
   private final PreparedDriverFolderLauncher launcher;
-  private final REEFFileNames fileNames;
-  private final DriverConfigurationProvider driverConfigurationProvider;
-  private final Set<ConfigurationProvider> configurationProviders;
+  private final LocalRuntimeDriverConfigurationGenerator configurationGenerator;
 
   @Inject
-  private LocalClient(final AvroConfigurationSerializer configurationSerializer,
-                      final PreparedDriverFolderLauncher launcher,
-                      final REEFFileNames fileNames,
-                      final DriverConfigurationProvider driverConfigurationProvider,
-                      @Parameter(DriverConfigurationProviders.class)
-                      final Set<ConfigurationProvider> configurationProviders) {
-    this.configurationSerializer = configurationSerializer;
+  private LocalClient(final PreparedDriverFolderLauncher launcher,
+                      final LocalRuntimeDriverConfigurationGenerator configurationGenerator) {
     this.launcher = launcher;
-    this.fileNames = fileNames;
-    this.driverConfigurationProvider = driverConfigurationProvider;
-    this.configurationProviders = configurationProviders;
+    this.configurationGenerator = configurationGenerator;
   }
 
   private void submit(final LocalSubmissionFromCS localSubmissionFromCS) throws IOException {
@@ -70,23 +54,8 @@ public final class LocalClient {
       throw new IOException("The Driver folder " + driverFolder.getAbsolutePath() + " doesn't exist.");
     }
 
-    final Configuration driverConfiguration1 = driverConfigurationProvider
-        .getDriverConfiguration(localSubmissionFromCS.getJobFolder(), CLIENT_REMOTE_ID,
-            localSubmissionFromCS.getJobId(), Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
-    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
-    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
-      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
-    }
-    final Configuration providedConfigurations = configurationBuilder.build();
-    final Configuration driverConfiguration = Configurations.merge(
-        driverConfiguration1,
-        Tang.Factory.getTang()
-            .newConfigurationBuilder()
-            .bindNamedParameter(JobSubmissionDirectory.class, driverFolder.toString())
-            .build(),
-        providedConfigurations);
-    final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath());
-    configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+    configurationGenerator.writeConfiguration(localSubmissionFromCS.getJobFolder(),
+        localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID);
     launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
new file mode 100644
index 0000000..6d8f931
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for local runtime.
+ */
+final class LocalRuntimeDriverConfigurationGenerator {
+  private static final Logger LOG = Logger.getLogger(LocalRuntimeDriverConfigurationGenerator.class.getName());
+  private final REEFFileNames fileNames;
+  private final DriverConfigurationProvider driverConfigurationProvider;
+  private final Set<ConfigurationProvider> configurationProviders;
+  private final AvroConfigurationSerializer configurationSerializer;
+
+  @Inject
+  private LocalRuntimeDriverConfigurationGenerator(final AvroConfigurationSerializer configurationSerializer,
+                                           final REEFFileNames fileNames,
+                                           final DriverConfigurationProvider driverConfigurationProvider,
+                                           @Parameter(DriverConfigurationProviders.class)
+                                           final Set<ConfigurationProvider> configurationProviders){
+    this.fileNames = fileNames;
+    this.driverConfigurationProvider = driverConfigurationProvider;
+    this.configurationProviders = configurationProviders;
+    this.configurationSerializer = configurationSerializer;
+  }
+
+  /**
+   * Writes driver configuration to disk.
+   * @param jobFolder The folder in which the job is staged.
+   * @param jobId id of the job to be submitted
+   * @param clientRemoteId
+   * @return
+   * @throws IOException
+   */
+  public Configuration writeConfiguration(final File jobFolder,
+                                          final String jobId,
+                                          final String clientRemoteId) throws IOException {
+    final File driverFolder = new File(jobFolder, PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
+
+    final Configuration driverConfiguration1 = driverConfigurationProvider
+        .getDriverConfiguration(jobFolder, clientRemoteId,
+        jobId, Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
+    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
+      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
+    }
+    final Configuration providedConfigurations = configurationBuilder.build();
+    final Configuration driverConfiguration = Configurations.merge(
+        driverConfiguration1,
+        Tang.Factory.getTang()
+          .newConfigurationBuilder()
+          .bindNamedParameter(JobSubmissionDirectory.class, driverFolder.toString())
+          .build(),
+        providedConfigurations);
+    final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath());
+    configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+    return driverConfiguration;
+  }
+
+  public static void main(final String[] args) throws InjectionException, IOException {
+    final LocalSubmissionFromCS localSubmission = LocalSubmissionFromCS.fromCommandLine(args);
+    LOG.log(Level.INFO, "Local driver config generation received from C#: {0}", localSubmission);
+    final Configuration localRuntimeConfiguration = localSubmission.getRuntimeConfiguration();
+    final LocalRuntimeDriverConfigurationGenerator localConfigurationGenerator = Tang.Factory.getTang()
+        .newInjector(localRuntimeConfiguration)
+        .getInstance(LocalRuntimeDriverConfigurationGenerator.class);
+    localConfigurationGenerator.writeConfiguration(localSubmission.getJobFolder(),
+        localSubmission.getJobId(), ClientRemoteIdentifier.NONE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
new file mode 100644
index 0000000..7206a50
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnDriverConfigurationGenerator {
+  private static final Logger LOG = Logger.getLogger(YarnDriverConfigurationGenerator.class.getName());
+  private final Set<ConfigurationProvider> configurationProviders;
+  private final int driverRestartEvaluatorRecoverySeconds;
+  private final REEFFileNames fileNames;
+  private final ConfigurationSerializer configurationSerializer;
+
+  @Inject
+  private YarnDriverConfigurationGenerator(@Parameter(DriverConfigurationProviders.class)
+                                   final Set<ConfigurationProvider> configurationProviders,
+                                   @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
+                                   final int driverRestartEvaluatorRecoverySeconds,
+                                   final ConfigurationSerializer configurationSerializer,
+                                   final REEFFileNames fileNames) {
+    this.configurationProviders = configurationProviders;
+    this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
+    this.fileNames = fileNames;
+    this.configurationSerializer = configurationSerializer;
+  }
+
+  /**
+   * Writes driver configuration to disk.
+   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
+   * @param jobId id of the job to be submitted
+   * @param jobSubmissionFolder job submission folder on DFS
+   * @return the prepared driver configuration
+   * @throws IOException
+   */
+  public Configuration writeConfiguration(final File driverFolder,
+                                          final String jobId,
+                                          final String jobSubmissionFolder) throws IOException {
+    final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+    final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
+        .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
+        .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
+        .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+        .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+        .build();
+
+    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
+      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
+    }
+    final Configuration providedConfigurations =  configurationBuilder.build();
+
+    Configuration driverConfiguration = Configurations.merge(
+        Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
+        yarnDriverConfiguration,
+        providedConfigurations);
+
+    if (driverRestartEvaluatorRecoverySeconds > 0) {
+      LOG.log(Level.FINE, "Driver restart is enabled.");
+
+      final Configuration yarnDriverRestartConfiguration =
+          YarnDriverRestartConfiguration.CONF.build();
+
+      final Configuration driverRestartConfiguration =
+          DriverRestartConfiguration.CONF
+          .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+            JobDriver.DriverRestartActiveContextHandler.class)
+          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+            JobDriver.DriverRestartRunningTaskHandler.class)
+          .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+            driverRestartEvaluatorRecoverySeconds)
+          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+            JobDriver.DriverRestartCompletedHandler.class)
+          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+            JobDriver.DriverRestartFailedEvaluatorHandler.class)
+          .build();
+
+      driverConfiguration = Configurations.merge(
+        driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
+    }
+
+    this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+    return driverConfiguration;
+  }
+
+  /**
+   * This main is executed from .NET to perform driver config generation.
+   * For arguments detail:
+   * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
+   */
+  public static void main(final String[] args) throws InjectionException, IOException {
+    final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
+    LOG.log(Level.INFO, "YARN driver config generation received from C#: {0}", yarnSubmission);
+    final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
+
+    final Injector injector = Tang.Factory.getTang().newInjector(yarnConfiguration);
+    final YarnDriverConfigurationGenerator yarnClientConfigurationGenerator =
+        injector.getInstance(YarnDriverConfigurationGenerator.class);
+    final JobSubmissionDirectoryProvider directoryProvider = injector.getInstance(JobSubmissionDirectoryProvider.class);
+    final String jobId = yarnSubmission.getJobId();
+
+    LOG.log(Level.INFO, "Writing driver config for job {0}", jobId);
+    yarnClientConfigurationGenerator.writeConfiguration(
+        yarnSubmission.getDriverFolder(), jobId, directoryProvider.getJobSubmissionDirectoryPath(jobId).toString());
+    System.exit(0);
+    LOG.log(Level.INFO, "End of main in Java YarnDriverConfigurationGenerator");
+  }
+}
+
+/**
+ * How long the driver should wait before timing out on evaluator
+ * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
+ * enabled. Only used by .NET job submission.
+ */
+@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
+    " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
+    " enabled. Only used by .NET job submission.", default_value = "-1")
+final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
+  private SubmissionDriverRestartEvaluatorRecoverySeconds() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 53be5dd..843f677 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -18,19 +18,17 @@
  */
 package org.apache.reef.bridge.client;
 
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.client.DriverRestartConfiguration;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
 import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
-import org.apache.reef.javabridge.generic.JobDriver;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
@@ -38,22 +36,20 @@ import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
 import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.JARFileMaker;
+
 import javax.inject.Inject;
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -69,11 +65,10 @@ public final class YarnJobSubmissionClient {
   private final REEFFileNames fileNames;
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
-  private final Set<ConfigurationProvider> configurationProviders;
   private final int maxApplicationSubmissions;
-  private final int driverRestartEvaluatorRecoverySeconds;
   private final SecurityTokenProvider tokenProvider;
   private final List<String> commandPrefixList;
+  private final YarnDriverConfigurationGenerator configurationGenerator;
 
   @Inject
   YarnJobSubmissionClient(final JobUploader uploader,
@@ -81,78 +76,21 @@ public final class YarnJobSubmissionClient {
                           final ConfigurationSerializer configurationSerializer,
                           final REEFFileNames fileNames,
                           final ClasspathProvider classpath,
-                          @Parameter(DriverConfigurationProviders.class)
-                          final Set<ConfigurationProvider> configurationProviders,
                           @Parameter(MaxApplicationSubmissions.class)
                           final int maxApplicationSubmissions,
                           @Parameter(DriverLaunchCommandPrefix.class)
                           final List<String> commandPrefixList,
-                          @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
-                          final int driverRestartEvaluatorRecoverySeconds,
-                          final SecurityTokenProvider tokenProvider) {
+                          final SecurityTokenProvider tokenProvider,
+                          final YarnDriverConfigurationGenerator configurationGenerator) {
     this.uploader = uploader;
     this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
-    this.configurationProviders = configurationProviders;
     this.maxApplicationSubmissions = maxApplicationSubmissions;
-    this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
     this.tokenProvider = tokenProvider;
     this.commandPrefixList = commandPrefixList;
-  }
-
-  private Configuration addYarnDriverConfiguration(final File driverFolder,
-                                                   final String jobId,
-                                                   final String jobSubmissionFolder)
-      throws IOException {
-    final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
-    final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
-        .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
-        .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
-        .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
-        .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
-        .build();
-
-    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
-    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
-      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
-    }
-    final Configuration providedConfigurations =  configurationBuilder.build();
-
-    Configuration driverConfiguration = Configurations.merge(
-        Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
-        yarnDriverConfiguration,
-        providedConfigurations);
-
-    if (driverRestartEvaluatorRecoverySeconds > 0) {
-      LOG.log(Level.FINE, "Driver restart is enabled.");
-
-      final Configuration yarnDriverRestartConfiguration =
-          YarnDriverRestartConfiguration.CONF
-              .build();
-
-      final Configuration driverRestartConfiguration =
-          DriverRestartConfiguration.CONF
-              .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
-              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
-                  JobDriver.DriverRestartActiveContextHandler.class)
-              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
-                  JobDriver.DriverRestartRunningTaskHandler.class)
-              .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
-                  driverRestartEvaluatorRecoverySeconds)
-              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
-                  JobDriver.DriverRestartCompletedHandler.class)
-              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
-                  JobDriver.DriverRestartFailedEvaluatorHandler.class)
-              .build();
-
-      driverConfiguration = Configurations.merge(
-          driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
-    }
-
-    this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
-    return driverConfiguration;
+    this.configurationGenerator = configurationGenerator;
   }
 
   /**
@@ -181,8 +119,9 @@ public final class YarnJobSubmissionClient {
       // Prepare the JAR
       final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
       final Configuration jobSubmissionConfiguration =
-          this.addYarnDriverConfiguration(yarnSubmission.getDriverFolder(), yarnSubmission.getJobId(),
-              jobFolderOnDFS.getPath().toString());
+          this.configurationGenerator.writeConfiguration(yarnSubmission.getDriverFolder(),
+            yarnSubmission.getJobId(),
+            jobFolderOnDFS.getPath().toString());
       final File jarFile = makeJar(yarnSubmission.getDriverFolder());
       LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
 
@@ -288,13 +227,9 @@ public final class YarnJobSubmissionClient {
   }
 
   /**
-   * Takes 5 parameters from the C# side:
-   * [0]: String. Driver folder.
-   * [1]: String. Driver identifier.
-   * [2]: int. Driver memory.
-   * [3~5]: int. TCP configurations.
-   * [6]: int. Max application submissions.
-   * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart is enabled.
+   * .NET client calls into this main method for job submission.
+   * For arguments detail:
+   * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
    */
   public static void main(final String[] args) throws InjectionException, IOException, YarnException {
     final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
@@ -318,16 +253,3 @@ public final class YarnJobSubmissionClient {
     LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
   }
 }
-
-/**
- * How long the driver should wait before timing out on evaluator
- * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
- * enabled. Only used by .NET job submission.
- */
-@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
-    " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
-    " enabled. Only used by .NET job submission.", default_value = "-1")
-final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
-  private SubmissionDriverRestartEvaluatorRecoverySeconds() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
index 2ab6c9b..9b4c35b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
@@ -197,7 +197,7 @@ final class YarnSubmissionFromCS {
   }
 
   /**
-   * Takes 5 parameters from the C# side:
+   * Takes 9 parameters from the C# side:
    * [0]: String. Driver folder.
    * [1]: String. Driver identifier.
    * [2]: int. Driver memory.