You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/11/16 04:32:20 UTC

[1/2] incubator-reef git commit: [REEF-910] Assemble entire driver configuration on driver instead of client

Repository: incubator-reef
Updated Branches:
  refs/heads/master fc5d9079b -> d55433956


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 2fa8602..c424fbc 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.REEFLauncher;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
@@ -54,9 +55,9 @@ public final class YarnSubmissionHelper implements Closeable{
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
   private final SecurityTokenProvider tokenProvider;
-  private boolean preserveEvaluators;
-  private int maxAppSubmissions;
   private final List<String> commandPrefixList;
+  private Class launcherClazz;
+  private String confFileName;
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
@@ -77,10 +78,10 @@ public final class YarnSubmissionHelper implements Closeable{
     this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
     this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
     this.applicationId = applicationSubmissionContext.getApplicationId();
-    this.maxAppSubmissions = 1;
-    this.preserveEvaluators = false;
     this.tokenProvider = tokenProvider;
     this.commandPrefixList = commandPrefixList;
+    this.launcherClazz = REEFLauncher.class;
+    this.confFileName = this.fileNames.getDriverConfigurationPath();
     LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
   }
 
@@ -201,10 +202,33 @@ public final class YarnSubmissionHelper implements Closeable{
     return this;
   }
 
+  /**
+   * Sets the launcher class for the job.
+   * @param launcherClass
+   * @return
+   */
+  public YarnSubmissionHelper setLauncherClass(final Class launcherClass) {
+    this.launcherClazz = launcherClass;
+    return this;
+  }
+
+  /**
+   * Sets the configuration file for the job.
+   * Note that this does not have to be the Driver TANG configuration. In the bootstrap
+   * launch case, this can be the Avro file that supports the generation of a driver
+   * configuration file natively at the Launcher.
+   * @param configurationFileName
+   * @return
+   */
+  public YarnSubmissionHelper setConfigurationFileName(final String configurationFileName) {
+    this.confFileName = configurationFileName;
+    return this;
+  }
+
   public void submit() throws IOException, YarnException {
     // SET EXEC COMMAND
-    final List<String> launchCommand = new JavaLaunchCommandBuilder(commandPrefixList)
-        .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+    final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList)
+        .setConfigurationFileName(confFileName)
         .setClassPath(this.classpath.getDriverClasspath())
         .setMemory(this.applicationSubmissionContext.getResource().getMemory())
         .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName())


[2/2] incubator-reef git commit: [REEF-910] Assemble entire driver configuration on driver instead of client

Posted by ju...@apache.org.
[REEF-910] Assemble entire driver configuration on driver instead of client

This addressed the issue by
  * Adds the Bootstrap Launcher for YARN, which removes Java dependency if submitted through REST.
  * Modifies DriverLauncher to accomodate other types of launchers.
  * Moves YARN submission through .NET into the new two-step process.

JIRA:
  [REEF-910](https://issues.apache.org/jira/browse/REEF-910)

This closes #631


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d5543395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d5543395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d5543395

Branch: refs/heads/master
Commit: d5543395691ca0baa8a879fb190531ecba56edce
Parents: fc5d907
Author: Andrew Chung <af...@gmail.com>
Authored: Wed Nov 11 17:38:51 2015 -0800
Committer: Julia Wang <ju...@apache.org>
Committed: Sun Nov 15 18:58:23 2015 -0800

----------------------------------------------------------------------
 .../Org.Apache.REEF.Bridge/DriverLauncher.cpp   |  11 +-
 .../AvroYarnClusterJobSubmissionParameters.cs   |   4 +-
 .../YARN/AvroYarnJobSubmissionParameters.cs     |  12 +-
 .../Common/DriverFolderPreparationHelper.cs     |   2 +-
 lang/java/reef-bridge-client/pom.xml            |   4 +
 .../src/main/avro/JobSubmissionParameters.avsc  |   5 +-
 .../YarnBootstrapDriverConfigGenerator.java     | 145 ++++++++++++
 .../client/YarnBootstrapREEFLauncher.java       |  71 ++++++
 .../client/YarnClusterSubmissionFromCS.java     | 202 +++++++++++++++++
 .../YarnDriverConfigurationGenerator.java       | 167 --------------
 .../bridge/client/YarnJobSubmissionClient.java  |  80 ++++---
 ...arnJobSubmissionParametersFileGenerator.java |  82 +++++++
 .../bridge/client/YarnSubmissionFromCS.java     | 219 -------------------
 ...SubmissionParametersSerializationFromCS.java | 164 ++++++++++++++
 .../apache/reef/bridge/client/package-info.java |  22 ++
 .../reef/runtime/common/REEFLauncher.java       |  31 ---
 .../runtime/common/files/REEFFileNames.java     |  13 ++
 .../common/launch/JavaLaunchCommandBuilder.java |  48 +++-
 .../yarn/client/YarnSubmissionHelper.java       |  36 ++-
 19 files changed, 838 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp b/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
index d458ae4..f00244b 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
@@ -49,8 +49,9 @@ const int maxPathBufSize = 16 * 1024;
 const char ClassPathSeparatorCharForWindows = ';';
 
 // we look for this to delineate java vm arguments from app arguments.
-// todo: be smarter about this. Accomodate arbitrary apps.
-const char* launcherClass = "org.apache.reef.runtime.common.REEFLauncher";
+// TODO: be smarter about this. Accomodate arbitrary apps that doesn't 
+// contain the term REEFLauncher.
+const char* launcherClassSearchStr = "REEFLauncher";
 
 // method to invoke
 const char* JavaMainMethodName = "main";
@@ -89,7 +90,7 @@ void GetCounts(
     firstArgOrdinal = -1;
 
     for (int i = firstOptionOrdinal; i < cArgs; i++) {
-        if (option && 0 == strcmp(argv[i], launcherClass)) {
+        if (option && NULL != strstr(argv[i], launcherClassSearchStr)) {
             option = false;
             firstArgOrdinal = i;
         }
@@ -100,6 +101,10 @@ void GetCounts(
             ++argCount;
         }
     }
+
+    if (firstArgOrdinal < 0) {
+        throw gcnew ArgumentException("Unable to find a REEF Launcher");
+    }
 }
 
 //

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
index f31874c..3d09212 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
     [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
     public sealed class AvroYarnClusterJobSubmissionParameters
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
 ,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
 ,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -70,6 +70,8 @@ namespace Org.Apache.REEF.Client.Avro.YARN
         /// </summary>
         public AvroYarnClusterJobSubmissionParameters()
         {
+            this.securityTokenKind = "NULL";
+            this.securityTokenService = "NULL";
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
index a3be9b4..3e966ec 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
     [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
     public sealed class AvroYarnJobSubmissionParameters
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -60,6 +60,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN
         public int driverRecoveryTimeout { get; set; }
 
         /// <summary>
+        /// Gets or sets the dfsJobSubmissionFolder field.
+        /// </summary>
+        [DataMember]
+        public string dfsJobSubmissionFolder { get; set; }
+
+        /// <summary>
         /// Gets or sets the jobSubmissionDirectoryPrefix field.
         /// </summary>
         [DataMember]
@@ -78,12 +84,14 @@ namespace Org.Apache.REEF.Client.Avro.YARN
         /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
         /// <param name="driverMemory">The driverMemory.</param>
         /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param>
+        /// <param name="dfsJobSubmissionFolder">The dfsJobSubmissionFolder.</param>
         /// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param>
-        public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string jobSubmissionDirectoryPrefix)
+        public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix)
         {
             this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
             this.driverMemory = driverMemory;
             this.driverRecoveryTimeout = driverRecoveryTimeout;
+            this.dfsJobSubmissionFolder = dfsJobSubmissionFolder;
             this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
index 336685f..d422737 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
@@ -182,7 +182,7 @@ namespace Org.Apache.REEF.Client.Common
             var lowerCasePath = fileName.ToLower();
             return lowerCasePath.EndsWith(DLLFileNameExtension) ||
                    lowerCasePath.EndsWith(EXEFileNameExtension) ||
-                   lowerCasePath.StartsWith(ClientConstants.DriverJarFilePrefix);
+                   lowerCasePath.StartsWith(ClientConstants.ClientJarFilePrefix);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index e0b31c0..9b1a1a6 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -98,6 +98,10 @@ under the License.
             <artifactId>hadoop-common</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
 
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 1bf830d..ef06e25 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -49,6 +49,7 @@
         { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
         { "name": "driverMemory", "type": "int" },
         { "name": "driverRecoveryTimeout", "type": "int" },
+        { "name": "dfsJobSubmissionFolder", "type": ["null", "string"], "default" : null },
         { "name": "jobSubmissionDirectoryPrefix", "type": "string" }
       ]
   },
@@ -60,8 +61,8 @@
       "fields": [
         { "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" },
         { "name": "maxApplicationSubmissions", "type": "int" },
-        { "name": "securityTokenKind", "type": "string" },
-        { "name": "securityTokenService", "type": "string" }
+        { "name": "securityTokenKind", "type": "string", "default": "NULL" },
+        { "name": "securityTokenService", "type": "string", "default": "NULL" }
       ]
   }
 ]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
new file mode 100644
index 0000000..23f86ff
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is the Java Driver configuration generator for .NET Drivers that generates
+ * the Driver configuration at runtime. Called by {@link YarnBootstrapREEFLauncher}.
+ */
+final class YarnBootstrapDriverConfigGenerator {
+  private static final Logger LOG = Logger.getLogger(YarnBootstrapDriverConfigGenerator.class.getName());
+
+  private final REEFFileNames reefFileNames;
+  private final ConfigurationSerializer configurationSerializer;
+
+  @Inject
+  private YarnBootstrapDriverConfigGenerator(final REEFFileNames reefFileNames,
+                                             final ConfigurationSerializer configurationSerializer) {
+    this.configurationSerializer = configurationSerializer;
+    this.reefFileNames = reefFileNames;
+  }
+
+  public String writeDriverConfigurationFile(final String bootstrapArgsLocation) throws IOException {
+    final File bootstrapArgsFile = new File(bootstrapArgsLocation);
+    final AvroYarnJobSubmissionParameters yarnBootstrapArgs =
+        readYarnJobSubmissionParametersFromFile(bootstrapArgsFile);
+    final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
+
+    this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapArgs),
+        new File(driverConfigPath));
+
+    return driverConfigPath;
+  }
+
+  static Configuration getYarnDriverConfiguration(
+      final AvroYarnJobSubmissionParameters yarnJobSubmissionParams) {
+    final AvroJobSubmissionParameters jobSubmissionParameters =
+        yarnJobSubmissionParams.getSharedJobSubmissionParameters();
+    final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
+        .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(jobSubmissionParameters.getTcpBeginPort()))
+        .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(jobSubmissionParameters.getTcpRangeCount()))
+        .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(jobSubmissionParameters.getTcpTryCount()))
+        .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
+            yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
+        .build();
+
+    final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
+        .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
+            yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
+        .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString())
+        .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+        .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+        .build();
+
+    final Configuration driverConfiguration = Configurations.merge(
+        Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, yarnDriverConfiguration, providerConfig);
+
+    if (yarnJobSubmissionParams.getDriverRecoveryTimeout() > 0) {
+      LOG.log(Level.FINE, "Driver restart is enabled.");
+
+      final Configuration yarnDriverRestartConfiguration =
+          YarnDriverRestartConfiguration.CONF.build();
+
+      final Configuration driverRestartConfiguration =
+          DriverRestartConfiguration.CONF
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+                  JobDriver.DriverRestartActiveContextHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+                  JobDriver.DriverRestartRunningTaskHandler.class)
+              .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+                  yarnJobSubmissionParams.getDriverRecoveryTimeout())
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+                  JobDriver.DriverRestartCompletedHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+                  JobDriver.DriverRestartFailedEvaluatorHandler.class)
+              .build();
+
+      return Configurations.merge(driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
+    }
+
+    return driverConfiguration;
+  }
+
+  static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromFile(final File file)
+      throws IOException {
+    try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+      // This is mainly a test hook.
+      return readYarnJobSubmissionParametersFromInputStream(fileInputStream);
+    }
+  }
+
+  static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromInputStream(
+      final InputStream inputStream) throws IOException {
+    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+        AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
+    final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
+        AvroYarnJobSubmissionParameters.class);
+    return reader.read(null, decoder);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
new file mode 100644
index 0000000..5f6b5c2
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.annotations.audience.Interop;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is a bootstrap launcher for YARN for submission from C#. It allows for Java Driver
+ * configuration generation directly on the Driver without need of Java dependency if REST
+ * submission is used. Note that the name of the class must contain "REEFLauncher" for the time
+ * being in order for the Interop code to discover the class.
+ */
+@Interop(CppFiles = "DriverLauncher.cpp")
+public final class YarnBootstrapREEFLauncher {
+  private static final Logger LOG = Logger.getLogger(YarnBootstrapREEFLauncher.class.getName());
+
+  public static void main(final String[] args) throws IOException, InjectionException {
+    LOG.log(Level.INFO, "Entering BootstrapLauncher.main().");
+
+    if (args.length != 1) {
+      final String message = "Bootstrap launcher should have a single configuration file input specifying the" +
+          " job submission parameters to be deserialized to create the YarnDriverConfiguration on the fly.";
+
+      throw fatal(message, new IllegalArgumentException(message));
+    }
+
+    try {
+      final YarnBootstrapDriverConfigGenerator yarnDriverConfigurationGenerator =
+          Tang.Factory.getTang().newInjector().getInstance(YarnBootstrapDriverConfigGenerator.class);
+      REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0])});
+    } catch (final Exception exception) {
+      if (!(exception instanceof RuntimeException)) {
+        throw fatal("Failed to initialize configurations.", exception);
+      }
+
+      throw exception;
+    }
+  }
+
+  private static RuntimeException fatal(final String msg, final Throwable t) {
+    LOG.log(Level.SEVERE, msg, t);
+    return new RuntimeException(msg, t);
+  }
+
+  private YarnBootstrapREEFLauncher(){
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
new file mode 100644
index 0000000..f0b5e6c
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang.Validate;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Represents a job submission from the CS code.
+ * <p>
+ * This class exists mostly to parse and validate the command line parameters provided by the C# class
+ * `Org.Apache.REEF.Client.YARN.YARNClient`
+ */
+final class YarnClusterSubmissionFromCS {
+  private static final int DEFAULT_PRIORITY = 1;
+  private static final String DEFAULT_QUEUE = "default";
+
+  private final File driverFolder;
+  private final String jobId;
+  private final int driverMemory;
+  private final int tcpBeginPort;
+  private final int tcpRangeCount;
+  private final int tcpTryCount;
+  private final int maxApplicationSubmissions;
+  private final int driverRecoveryTimeout;
+
+  // Static for now
+  private final int priority;
+  private final String queue;
+  private final String tokenKind;
+  private final String tokenService;
+  private final String jobSubmissionDirectoryPrefix;
+  private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
+
+  private YarnClusterSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
+    yarnJobSubmissionParameters = yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
+
+    final AvroJobSubmissionParameters jobSubmissionParameters =
+        yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
+
+    this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
+    this.jobId = jobSubmissionParameters.getJobId().toString();
+    this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
+    this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
+    this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+    this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
+    this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
+    this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
+    this.priority = DEFAULT_PRIORITY;
+    this.queue = DEFAULT_QUEUE;
+    this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
+    this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString();
+    this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString();
+
+    Validate.notEmpty(jobId, "The job id is null or empty");
+    Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
+    Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
+    Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
+    Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
+    Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app submissions given is <= 0.");
+    Validate.notEmpty(queue, "The queue is null or empty");
+    Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value");
+    Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value");
+    Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty");
+  }
+
+  @Override
+  public String toString() {
+    return "YarnClusterSubmissionFromCS{" +
+        "driverFolder=" + driverFolder +
+        ", jobId='" + jobId + '\'' +
+        ", driverMemory=" + driverMemory +
+        ", tcpBeginPort=" + tcpBeginPort +
+        ", tcpRangeCount=" + tcpRangeCount +
+        ", tcpTryCount=" + tcpTryCount +
+        ", maxApplicationSubmissions=" + maxApplicationSubmissions +
+        ", driverRecoveryTimeout=" + driverRecoveryTimeout +
+        ", priority=" + priority +
+        ", queue='" + queue + '\'' +
+        ", tokenKind='" + tokenKind + '\'' +
+        ", tokenService='" + tokenService + '\'' +
+        ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
+        '}';
+  }
+
+  /**
+   * @return The local folder where the driver is staged.
+   */
+  File getDriverFolder() {
+    return driverFolder;
+  }
+
+  /**
+   * @return the id of the job to be submitted.
+   */
+  String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * @return the amount of memory to allocate for the Driver, in MB.
+   */
+  int getDriverMemory() {
+    return driverMemory;
+  }
+
+  /**
+   * @return The priority of the job submission
+   */
+  int getPriority() {
+    return priority;
+  }
+
+  /**
+   * @return The queue the driver will be submitted to.
+   */
+  String getQueue() {
+    return queue;
+  }
+
+  /**
+   * @return The security token kind
+   */
+  String getTokenKind() {
+    return tokenKind;
+  }
+
+  /**
+   * @return The security token service
+   */
+  String getTokenService() {
+    return tokenService;
+  }
+
+  /**
+   * @return The max amount of times the application can be submitted.
+   */
+  int getMaxApplicationSubmissions(){
+    return maxApplicationSubmissions;
+  }
+
+  /**
+   * @return The time allowed for Driver recovery to recover all its Evaluators.
+   */
+  int getDriverRecoveryTimeout() {
+    return driverRecoveryTimeout;
+  }
+
+  /**
+   * @return The submission parameters for YARN jobs.
+   */
+  AvroYarnJobSubmissionParameters getYarnJobSubmissionParameters() {
+    return yarnJobSubmissionParameters;
+  }
+
+  /**
+   * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
+   */
+  static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
+      throws IOException {
+    try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
+      // this is mainly a test hook
+      return readYarnClusterSubmissionFromCSFromInputStream(fileInputStream);
+    }
+  }
+
+  static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream(
+      final InputStream inputStream) throws IOException {
+    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+        AvroYarnClusterJobSubmissionParameters.getClassSchema(), inputStream);
+    final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
+        AvroYarnClusterJobSubmissionParameters.class);
+    final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
+    return new YarnClusterSubmissionFromCS(yarnClusterJobSubmissionParameters);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
deleted file mode 100644
index 2c91b78..0000000
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.client.DriverRestartConfiguration;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.javabridge.generic.JobDriver;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
-
-import javax.inject.Inject;
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Does client side manipulation of driver configuration for YARN runtime.
- */
-final class YarnDriverConfigurationGenerator {
-  private static final Logger LOG = Logger.getLogger(YarnDriverConfigurationGenerator.class.getName());
-  private final Set<ConfigurationProvider> configurationProviders;
-  private final int driverRestartEvaluatorRecoverySeconds;
-  private final REEFFileNames fileNames;
-  private final ConfigurationSerializer configurationSerializer;
-
-  @Inject
-  private YarnDriverConfigurationGenerator(@Parameter(DriverConfigurationProviders.class)
-                                   final Set<ConfigurationProvider> configurationProviders,
-                                   @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
-                                   final int driverRestartEvaluatorRecoverySeconds,
-                                   final ConfigurationSerializer configurationSerializer,
-                                   final REEFFileNames fileNames) {
-    this.configurationProviders = configurationProviders;
-    this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
-    this.fileNames = fileNames;
-    this.configurationSerializer = configurationSerializer;
-  }
-
-  /**
-   * Writes driver configuration to disk.
-   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
-   * @param jobId id of the job to be submitted
-   * @param jobSubmissionFolder job submission folder on DFS
-   * @return the prepared driver configuration
-   * @throws IOException
-   */
-  public Configuration writeConfiguration(final File driverFolder,
-                                          final String jobId,
-                                          final String jobSubmissionFolder) throws IOException {
-    final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
-    final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
-        .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
-        .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
-        .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
-        .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
-        .build();
-
-    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
-    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
-      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
-    }
-    final Configuration providedConfigurations =  configurationBuilder.build();
-
-    Configuration driverConfiguration = Configurations.merge(
-        Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
-        yarnDriverConfiguration,
-        providedConfigurations);
-
-    if (driverRestartEvaluatorRecoverySeconds > 0) {
-      LOG.log(Level.FINE, "Driver restart is enabled.");
-
-      final Configuration yarnDriverRestartConfiguration =
-          YarnDriverRestartConfiguration.CONF.build();
-
-      final Configuration driverRestartConfiguration =
-          DriverRestartConfiguration.CONF
-          .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
-          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
-            JobDriver.DriverRestartActiveContextHandler.class)
-          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
-            JobDriver.DriverRestartRunningTaskHandler.class)
-          .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
-            driverRestartEvaluatorRecoverySeconds)
-          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
-            JobDriver.DriverRestartCompletedHandler.class)
-          .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
-            JobDriver.DriverRestartFailedEvaluatorHandler.class)
-          .build();
-
-      driverConfiguration = Configurations.merge(
-        driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
-    }
-
-    this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
-    return driverConfiguration;
-  }
-
-  /**
-   * This main is executed from .NET to perform driver config generation.
-   * For arguments detail:
-   * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
-   */
-  public static void main(final String[] args) throws InjectionException, IOException {
-    final File jobSubmissionParametersFile = new File(args[0]);
-    if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
-      throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
-    }
-
-    final YarnSubmissionFromCS yarnSubmission =
-        YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
-
-    LOG.log(Level.INFO, "YARN driver config generation received from C#: {0}", yarnSubmission);
-    final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
-
-    final Injector injector = Tang.Factory.getTang().newInjector(yarnConfiguration);
-    final YarnDriverConfigurationGenerator yarnClientConfigurationGenerator =
-        injector.getInstance(YarnDriverConfigurationGenerator.class);
-    final JobSubmissionDirectoryProvider directoryProvider = injector.getInstance(JobSubmissionDirectoryProvider.class);
-    final String jobId = yarnSubmission.getJobId();
-
-    LOG.log(Level.INFO, "Writing driver config for job {0}", jobId);
-    yarnClientConfigurationGenerator.writeConfiguration(
-        yarnSubmission.getDriverFolder(), jobId, directoryProvider.getJobSubmissionDirectoryPath(jobId).toString());
-    System.exit(0);
-    LOG.log(Level.INFO, "End of main in Java YarnDriverConfigurationGenerator");
-  }
-}
-
-/**
- * How long the driver should wait before timing out on evaluator
- * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
- * enabled. Only used by .NET job submission.
- */
-@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
-    " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
-    " enabled. Only used by .NET job submission.", default_value = "-1")
-final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
-  private SubmissionDriverRestartEvaluatorRecoverySeconds() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index b1d0c8b..93e3e92 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.bridge.client;
 
+import org.apache.commons.lang.Validate;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,21 +28,20 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
-import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
 import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
 import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
 import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.JARFileMaker;
 
 import javax.inject.Inject;
@@ -49,6 +49,7 @@ import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -61,36 +62,29 @@ public final class YarnJobSubmissionClient {
 
   private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
   private final JobUploader uploader;
-  private final ConfigurationSerializer configurationSerializer;
   private final REEFFileNames fileNames;
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
-  private final int maxApplicationSubmissions;
   private final SecurityTokenProvider tokenProvider;
   private final List<String> commandPrefixList;
-  private final YarnDriverConfigurationGenerator configurationGenerator;
+  private final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
 
   @Inject
   YarnJobSubmissionClient(final JobUploader uploader,
                           final YarnConfiguration yarnConfiguration,
-                          final ConfigurationSerializer configurationSerializer,
                           final REEFFileNames fileNames,
                           final ClasspathProvider classpath,
-                          @Parameter(MaxApplicationSubmissions.class)
-                          final int maxApplicationSubmissions,
                           @Parameter(DriverLaunchCommandPrefix.class)
                           final List<String> commandPrefixList,
                           final SecurityTokenProvider tokenProvider,
-                          final YarnDriverConfigurationGenerator configurationGenerator) {
+                          final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
     this.uploader = uploader;
-    this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
-    this.maxApplicationSubmissions = maxApplicationSubmissions;
     this.tokenProvider = tokenProvider;
     this.commandPrefixList = commandPrefixList;
-    this.configurationGenerator = configurationGenerator;
+    this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
   }
 
   /**
@@ -99,6 +93,7 @@ public final class YarnJobSubmissionClient {
    * @throws IOException
    */
   private File makeJar(final File driverFolder) throws IOException {
+    Validate.isTrue(driverFolder.exists());
     final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
     final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
     if (!reefFolder.isDirectory()) {
@@ -109,7 +104,7 @@ public final class YarnJobSubmissionClient {
     return jarFile;
   }
 
-  private void launch(final YarnSubmissionFromCS yarnSubmission) throws IOException, YarnException {
+  private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
     // ------------------------------------------------------------------------
     // Get an application ID
     try (final YarnSubmissionHelper submissionHelper =
@@ -118,10 +113,7 @@ public final class YarnJobSubmissionClient {
       // ------------------------------------------------------------------------
       // Prepare the JAR
       final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
-      final Configuration jobSubmissionConfiguration =
-          this.configurationGenerator.writeConfiguration(yarnSubmission.getDriverFolder(),
-            yarnSubmission.getJobId(),
-            jobFolderOnDFS.getPath().toString());
+      this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS);
       final File jarFile = makeJar(yarnSubmission.getDriverFolder());
       LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
 
@@ -131,29 +123,26 @@ public final class YarnJobSubmissionClient {
       final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile);
       LOG.info("Uploaded job submission JAR");
 
-      final Injector jobParamsInjector = Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
-
       // ------------------------------------------------------------------------
       // Submit
-      try {
-        submissionHelper
-            .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
-            .setApplicationName(yarnSubmission.getJobId())
-            .setDriverMemory(yarnSubmission.getDriverMemory())
-            .setPriority(yarnSubmission.getPriority())
-            .setQueue(yarnSubmission.getQueue())
-            .setMaxApplicationAttempts(this.maxApplicationSubmissions)
-            .setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class))
-            .submit();
-      } catch (InjectionException ie) {
-        throw new RuntimeException("Unable to submit job due to " + ie);
-      }
+      submissionHelper
+          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+          .setApplicationName(yarnSubmission.getJobId())
+          .setDriverMemory(yarnSubmission.getDriverMemory())
+          .setPriority(yarnSubmission.getPriority())
+          .setQueue(yarnSubmission.getQueue())
+          .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
+          .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
+          .setLauncherClass(YarnBootstrapREEFLauncher.class)
+          .setConfigurationFileName(fileNames.getYarnBootstrapParamFilePath())
+          .submit();
       writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
           submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
     }
   }
 
-  private static void writeSecurityTokenToUserCredential(final YarnSubmissionFromCS yarnSubmission) throws IOException {
+  private static void writeSecurityTokenToUserCredential(
+      final YarnClusterSubmissionFromCS yarnSubmission) throws IOException {
     final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     final REEFFileNames fileNames = new REEFFileNames();
     final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile();
@@ -229,7 +218,7 @@ public final class YarnJobSubmissionClient {
   /**
    * .NET client calls into this main method for job submission.
    * For arguments detail:
-   * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
+   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File)
    */
   public static void main(final String[] args) throws InjectionException, IOException, YarnException {
     final File jobSubmissionParametersFile = new File(args[0]);
@@ -237,8 +226,8 @@ public final class YarnJobSubmissionClient {
       throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
     }
 
-    final YarnSubmissionFromCS yarnSubmission =
-        YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+    final YarnClusterSubmissionFromCS yarnSubmission =
+        YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
 
     LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
     if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
@@ -250,11 +239,20 @@ public final class YarnJobSubmissionClient {
       LOG.log(Level.FINE, "Did not find security token");
     }
 
-    final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
-    final YarnJobSubmissionClient client = Tang.Factory.getTang()
-        .newInjector(yarnConfiguration)
+    final List<String> launchCommandPrefix = new ArrayList<String>() {{
+          add(new REEFFileNames().getDriverLauncherExeFile().toString());
+      }};
+
+    final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+        .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+        .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix)
+        .build();
+    final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig)
         .getInstance(YarnJobSubmissionClient.class);
+
     client.launch(yarnSubmission);
+
     LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient");
     System.exit(0);
     LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
new file mode 100644
index 0000000..ccb5e8b
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnJobSubmissionParametersFileGenerator {
+  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionParametersFileGenerator.class.getName());
+  private final REEFFileNames fileNames;
+
+  @Inject
+  private YarnJobSubmissionParametersFileGenerator(final REEFFileNames fileNames) {
+    this.fileNames = fileNames;
+  }
+
+  /**
+   * Writes driver configuration to disk.
+   * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the
+   *                                    YARN job for submission from the cluster.
+   * @throws IOException
+   */
+  public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+                                 final JobFolder jobFolderOnDFS) throws IOException {
+    final File yarnParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(),
+        fileNames.getYarnBootstrapParamFilePath());
+
+    try (final FileOutputStream fileOutputStream = new FileOutputStream(yarnParametersFile)) {
+      // this is mainly a test hook.
+      writeAvroYarnJobSubmissionParametersToOutputStream(
+          yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), fileOutputStream);
+    }
+  }
+
+  static void writeAvroYarnJobSubmissionParametersToOutputStream(
+      final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+      final String jobFolderOnDFSPath,
+      final OutputStream outputStream) throws IOException {
+    final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
+        new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
+
+    final AvroYarnJobSubmissionParameters jobSubmissionParameters =
+        yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
+    jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
+    final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
+        outputStream);
+    datumWriter.write(jobSubmissionParameters, encoder);
+    encoder.flush();
+    outputStream.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
deleted file mode 100644
index e8c5674..0000000
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.JsonDecoder;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang.Validate;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
-import org.apache.reef.io.TcpPortConfigurationProvider;
-import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents a job submission from the CS code.
- * <p>
- * This class exists mostly to parse and validate the command line parameters provided by the C# class
- * `Org.Apache.REEF.Client.YARN.YARNClient`
- */
-final class YarnSubmissionFromCS {
-  private static final int DEFAULT_PRIORITY = 1;
-  private static final String DEFAULT_QUEUE = "default";
-
-  private final File driverFolder;
-  private final String jobId;
-  private final int driverMemory;
-  private final int tcpBeginPort;
-  private final int tcpRangeCount;
-  private final int tcpTryCount;
-  private final int maxApplicationSubmissions;
-  private final int driverRecoveryTimeout;
-
-  // Static for now
-  private final int priority;
-  private final String queue;
-  private final String tokenKind;
-  private final String tokenService;
-  private final String jobSubmissionDirectoryPrefix;
-
-  private YarnSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
-    final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters =
-        yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
-
-    final AvroJobSubmissionParameters jobSubmissionParameters =
-        yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
-
-    this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
-    this.jobId = jobSubmissionParameters.getJobId().toString();
-    this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
-    this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
-    this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
-    this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
-    this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
-    this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
-    this.priority = DEFAULT_PRIORITY;
-    this.queue = DEFAULT_QUEUE;
-    this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
-    this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString();
-    this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString();
-
-    Validate.isTrue(driverFolder.exists(), "The driver folder given does not exist.");
-    Validate.notEmpty(jobId, "The job id is null or empty");
-    Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
-    Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
-    Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
-    Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
-    Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app submissions given is <= 0.");
-    Validate.notEmpty(queue, "The queue is null or empty");
-    Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value");
-    Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value");
-    Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty");
-  }
-
-  @Override
-  public String toString() {
-    return "YarnSubmissionFromCS{" +
-        "driverFolder=" + driverFolder +
-        ", jobId='" + jobId + '\'' +
-        ", driverMemory=" + driverMemory +
-        ", tcpBeginPort=" + tcpBeginPort +
-        ", tcpRangeCount=" + tcpRangeCount +
-        ", tcpTryCount=" + tcpTryCount +
-        ", maxApplicationSubmissions=" + maxApplicationSubmissions +
-        ", driverRecoveryTimeout=" + driverRecoveryTimeout +
-        ", priority=" + priority +
-        ", queue='" + queue + '\'' +
-        ", tokenKind='" + tokenKind + '\'' +
-        ", tokenService='" + tokenService + '\'' +
-        ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
-        '}';
-  }
-
-  /**
-   * Produces the YARN Runtime Configuration based on the parameters passed from C#.
-   *
-   * @return the YARN Runtime Configuration based on the parameters passed from C#.
-   */
-  Configuration getRuntimeConfiguration() {
-    final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
-        .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
-        .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort))
-        .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount))
-        .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount))
-        .bindNamedParameter(JobSubmissionDirectoryPrefix.class, jobSubmissionDirectoryPrefix)
-        .build();
-
-    final List<String> driverLaunchCommandPrefixList = new ArrayList<>();
-    driverLaunchCommandPrefixList.add(new REEFFileNames().getDriverLauncherExeFile().toString());
-
-    final Configuration yarnJobSubmissionClientParamsConfig = Tang.Factory.getTang().newConfigurationBuilder()
-        .bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
-            Integer.toString(driverRecoveryTimeout))
-        .bindNamedParameter(MaxApplicationSubmissions.class, Integer.toString(maxApplicationSubmissions))
-        .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList)
-        .build();
-
-    return Configurations.merge(YarnClientConfiguration.CONF.build(), providerConfig,
-        yarnJobSubmissionClientParamsConfig);
-  }
-
-  /**
-   * @return The local folder where the driver is staged.
-   */
-  File getDriverFolder() {
-    return driverFolder;
-  }
-
-  /**
-   * @return the id of the job to be submitted.
-   */
-  String getJobId() {
-    return jobId;
-  }
-
-  /**
-   * @return the amount of memory to allocate for the Driver, in MB.
-   */
-  int getDriverMemory() {
-    return driverMemory;
-  }
-
-  /**
-   * @return The priority of the job submission
-   */
-  int getPriority() {
-    return priority;
-  }
-
-  /**
-   * @return The queue the driver will be submitted to.
-   */
-  String getQueue() {
-    return queue;
-  }
-
-  /**
-   * @return The security token kind
-   */
-  String getTokenKind() {
-    return tokenKind;
-  }
-
-  /**
-   * @return The security token service
-   */
-  String getTokenService() {
-    return tokenService;
-  }
-
-  /**
-   * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
-   */
-  static YarnSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
-      throws IOException {
-    try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
-      final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
-          AvroYarnClusterJobSubmissionParameters.getClassSchema(), fileInputStream);
-      final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
-          AvroYarnClusterJobSubmissionParameters.class);
-      final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
-
-      return new YarnSubmissionFromCS(yarnClusterJobSubmissionParameters);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
new file mode 100644
index 0000000..ba930d4
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Tests for generating Driver configuration by bootstrapping the process to the
+ * {@link YarnBootstrapREEFLauncher}. Tests the Avro job submission serialization
+ * deserialization from C# using mocked, expected Strings.
+ */
+public final class TestAvroJobSubmissionParametersSerializationFromCS {
+  private static final String NULL_REP = "NULL";
+  private static final String STRING_REP = "HelloREEF";
+  private static final String STRING_REP_QUOTED = "\"" + STRING_REP + "\"";
+  private static final long NUMBER_REP = 12345;
+  private static final String AVRO_YARN_PARAMETERS_SERIALIZED_STRING =
+      "{" +
+          "\"sharedJobSubmissionParameters\":" +
+          "{" +
+            "\"jobId\":" + STRING_REP_QUOTED + "," +
+            "\"tcpBeginPort\":" + NUMBER_REP + "," +
+            "\"tcpRangeCount\":" + NUMBER_REP + "," +
+            "\"tcpTryCount\":" + NUMBER_REP + "," +
+            "\"jobSubmissionFolder\":" + STRING_REP_QUOTED +
+          "}," +
+          "\"driverMemory\":" + NUMBER_REP + "," +
+          "\"driverRecoveryTimeout\":" + NUMBER_REP + "," +
+          "\"dfsJobSubmissionFolder\":" +
+          "{" +
+            "\"string\": " + STRING_REP_QUOTED +
+          "}," +
+          "\"jobSubmissionDirectoryPrefix\":" + STRING_REP_QUOTED +
+      "}";
+
+  private static final String AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING =
+      "{" +
+          "\"yarnJobSubmissionParameters\":" + AVRO_YARN_PARAMETERS_SERIALIZED_STRING + "," +
+          "\"maxApplicationSubmissions\":" + NUMBER_REP + "," +
+          "\"securityTokenKind\":\"" + NULL_REP + "\"," +
+          "\"securityTokenService\":\"" + NULL_REP + "\"" +
+      "}";
+
+  /**
+   * Tests deserialization of the Avro parameters for submission from the cluster from C#.
+   * @throws IOException
+   */
+  @Test
+  public void testAvroYarnClusterParametersDeserialization() throws IOException {
+    final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS = createYarnClusterSubmissionFromCS();
+    assert yarnClusterSubmissionFromCS.getDriverFolder().equals(new File(STRING_REP));
+    assert yarnClusterSubmissionFromCS.getDriverMemory() == NUMBER_REP;
+    assert yarnClusterSubmissionFromCS.getDriverRecoveryTimeout() == NUMBER_REP;
+    assert yarnClusterSubmissionFromCS.getJobId().equals(STRING_REP);
+    assert yarnClusterSubmissionFromCS.getMaxApplicationSubmissions() == NUMBER_REP;
+    assert yarnClusterSubmissionFromCS.getTokenKind().equals(NULL_REP);
+    assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP);
+
+    verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters());
+  }
+
+  /**
+   * Tests deserialization of the Avro parameters for YARN from C#.
+   * @throws IOException
+   */
+  @Test
+  public void testAvroYarnParametersDeserialization() throws IOException {
+    verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters());
+  }
+
+  /**
+   * Tests a round-trip serialization deserialization process of the Avro parameters from C#.
+   * @throws IOException
+   */
+  @Test
+  public void testAvroYarnParametersSerialization() throws IOException {
+    try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+      YarnJobSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream(
+          createYarnClusterSubmissionFromCS(), STRING_REP, outputStream);
+      final byte[] content = outputStream.toByteArray();
+      try (final InputStream stream = new ByteArrayInputStream(content)) {
+        verifyYarnJobSubmissionParams(
+            YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream));
+      }
+    }
+  }
+
+  /**
+   * Tests that the DriverConfiguration is bound with Avro parameters from C#.
+   * @throws IOException
+   * @throws InjectionException
+   */
+  @Test
+  public void testYarnBootstrapDriverConfigGenerator() throws IOException, InjectionException {
+    final Configuration yarnBootstrapDriverConfig =
+        YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration(createAvroYarnJobSubmissionParameters());
+    final Injector injector = Tang.Factory.getTang().newInjector(yarnBootstrapDriverConfig);
+
+    assert injector.getNamedInstance(JobSubmissionDirectory.class).equals(STRING_REP);
+    assert injector.getNamedInstance(JobSubmissionDirectoryPrefix.class).equals(STRING_REP);
+    assert injector.getNamedInstance(JobIdentifier.class).equals(STRING_REP);
+    assert injector.getNamedInstance(TcpPortRangeBegin.class) == NUMBER_REP;
+    assert injector.getNamedInstance(TcpPortRangeCount.class) == NUMBER_REP;
+    assert injector.getNamedInstance(TcpPortRangeTryCount.class) == NUMBER_REP;
+  }
+
+  private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException {
+    try (final InputStream stream =
+             new ByteArrayInputStream(AVRO_YARN_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+      return YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream);
+    }
+  }
+
+  private static YarnClusterSubmissionFromCS createYarnClusterSubmissionFromCS() throws IOException {
+    try (final InputStream stream =
+             new ByteArrayInputStream(
+                 AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+      return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(stream);
+    }
+  }
+
+  private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters) {
+    final AvroJobSubmissionParameters sharedJobSubmissionParams =
+        jobSubmissionParameters.getSharedJobSubmissionParameters();
+    assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
+    assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
+    assert sharedJobSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+    assert sharedJobSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+    assert sharedJobSubmissionParams.getTcpTryCount() == NUMBER_REP;
+    assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
+    assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
new file mode 100644
index 0000000..263d4cc
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for the bridge client.
+ */
+package org.apache.reef.bridge.client;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 878e9b2..55452a4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -43,7 +43,6 @@ import javax.inject.Inject;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -191,36 +190,6 @@ public final class REEFLauncher {
     }
   }
 
-  /**
-   * Pass values of the properties specified in the propNames array as <code>-D...</code>
-   * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
-   *
-   * @param vargs     List of command line parameters to append to.
-   * @param copyNull  create an empty parameter if the property is missing in current process.
-   * @param propNames property names.
-   */
-  public static void propagateProperties(
-          final Collection<String> vargs, final boolean copyNull, final String... propNames) {
-    for (final String propName : propNames) {
-      final String propValue = System.getProperty(propName);
-      if (propValue == null || propValue.isEmpty()) {
-        if (copyNull) {
-          vargs.add("-D" + propName);
-        }
-      } else {
-        vargs.add(String.format("-D%s=%s", propName, propValue));
-      }
-    }
-  }
-
-  /**
-   * Same as above, but with copyNull == false by default.
-   */
-  public static void propagateProperties(
-          final Collection<String> vargs, final String... propNames) {
-    propagateProperties(vargs, false, propNames);
-  }
-
   private void logVersion() {
     this.reefVersion.logVersion();
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
index be25102..5de19c4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -51,6 +51,7 @@ public final class REEFFileNames {
   private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
   private static final String SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
   private static final String SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
+  private static final String YARN_BOOTSTRAP_PARAM_FILE = "yarnparameters.json";
 
   @Inject
   public REEFFileNames() {
@@ -231,4 +232,16 @@ public final class REEFFileNames {
   public String getSecurityTokenPasswordFile() {
     return SECURITY_TOKEN_PASSWORD_FILE;
   }
+
+  /**
+   * @return File name the contains the bootstrap parameters for YARN job submission
+   * without Java dependency.
+   */
+  public String getYarnBootstrapParamFile() {
+    return YARN_BOOTSTRAP_PARAM_FILE;
+  }
+
+  public String getYarnBootstrapParamFilePath() {
+    return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapParamFile();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
index 9484249..e6f74ab 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -46,23 +46,35 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
   private String javaPath = null;
   private String classPath = null;
   private Boolean assertionsEnabled = null;
-  private Map<String, JVMOption> options = new HashMap<>();
+  private final Map<String, JVMOption> options = new HashMap<>();
   private final List<String> commandPrefixList;
+  private final Class launcherClass;
 
   /**
-   * Constructor that populates default options.
+   * Constructor that populates default options, using the default Launcher
+   * class {@link REEFLauncher}.
    */
   public JavaLaunchCommandBuilder() {
-    this(null);
+    this(REEFLauncher.class, null);
   }
 
   /**
-   * Constructor that populates prefix.
+   * Constructor that uses the default Launcher class, {@link REEFLauncher}.
+   * @param commandPrefixList
    */
   public JavaLaunchCommandBuilder(final List<String> commandPrefixList) {
+    this(REEFLauncher.class, commandPrefixList);
+  }
+
+  /**
+   * Constructor that populates prefix and uses a custom Launcher class.
+   */
+  public JavaLaunchCommandBuilder(final Class launcherClass, final List<String> commandPrefixList) {
     for (final String defaultOption : DEFAULT_OPTIONS) {
       addOption(defaultOption);
     }
+
+    this.launcherClass = launcherClass;
     this.commandPrefixList = commandPrefixList;
   }
 
@@ -95,11 +107,11 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
           add(classPath);
         }
 
-        REEFLauncher.propagateProperties(this, true, "proc_reef");
-        REEFLauncher.propagateProperties(this, false,
+        propagateProperties(this, true, "proc_reef");
+        propagateProperties(this, false,
             "java.util.logging.config.file", "java.util.logging.config.class");
 
-        add(REEFLauncher.class.getName());
+        add(launcherClass.getName());
         add(evaluatorConfigurationPath);
 
         if (stdoutPath != null && !stdoutPath.isEmpty()) {
@@ -168,6 +180,28 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
     return addOption(JVMOption.parse(option));
   }
 
+  /**
+   * Pass values of the properties specified in the propNames array as <code>-D...</code>
+   * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
+   *
+   * @param vargs     List of command line parameters to append to.
+   * @param copyNull  create an empty parameter if the property is missing in current process.
+   * @param propNames property names.
+   */
+  private static void propagateProperties(
+      final Collection<String> vargs, final boolean copyNull, final String... propNames) {
+    for (final String propName : propNames) {
+      final String propValue = System.getProperty(propName);
+      if (propValue == null || propValue.isEmpty()) {
+        if (copyNull) {
+          vargs.add("-D" + propName);
+        }
+      } else {
+        vargs.add(String.format("-D%s=%s", propName, propValue));
+      }
+    }
+  }
+
   private JavaLaunchCommandBuilder addOption(final JVMOption jvmOption) {
     if (options.containsKey(jvmOption.option)) {
       LOG.warning("Replaced option " + options.get(jvmOption.option) + " with " + jvmOption);