You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/11/16 04:32:20 UTC
[1/2] incubator-reef git commit: [REEF-910] Assemble entire driver
configuration on driver instead of client
Repository: incubator-reef
Updated Branches:
refs/heads/master fc5d9079b -> d55433956
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 2fa8602..c424fbc 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.REEFLauncher;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
@@ -54,9 +55,9 @@ public final class YarnSubmissionHelper implements Closeable{
private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
private final SecurityTokenProvider tokenProvider;
- private boolean preserveEvaluators;
- private int maxAppSubmissions;
private final List<String> commandPrefixList;
+ private Class launcherClazz;
+ private String confFileName;
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
@@ -77,10 +78,10 @@ public final class YarnSubmissionHelper implements Closeable{
this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
this.applicationId = applicationSubmissionContext.getApplicationId();
- this.maxAppSubmissions = 1;
- this.preserveEvaluators = false;
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
+ this.launcherClazz = REEFLauncher.class;
+ this.confFileName = this.fileNames.getDriverConfigurationPath();
LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
}
@@ -201,10 +202,33 @@ public final class YarnSubmissionHelper implements Closeable{
return this;
}
+ /**
+ * Sets the launcher class for the job.
+ * @param launcherClass
+ * @return
+ */
+ public YarnSubmissionHelper setLauncherClass(final Class launcherClass) {
+ this.launcherClazz = launcherClass;
+ return this;
+ }
+
+ /**
+ * Sets the configuration file for the job.
+ * Note that this does not have to be the Driver TANG configuration. In the bootstrap
+ * launch case, this can be the Avro file that supports the generation of a driver
+ * configuration file natively at the Launcher.
+ * @param configurationFileName
+ * @return
+ */
+ public YarnSubmissionHelper setConfigurationFileName(final String configurationFileName) {
+ this.confFileName = configurationFileName;
+ return this;
+ }
+
public void submit() throws IOException, YarnException {
// SET EXEC COMMAND
- final List<String> launchCommand = new JavaLaunchCommandBuilder(commandPrefixList)
- .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList)
+ .setConfigurationFileName(confFileName)
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(this.applicationSubmissionContext.getResource().getMemory())
.setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName())
[2/2] incubator-reef git commit: [REEF-910] Assemble entire driver
configuration on driver instead of client
Posted by ju...@apache.org.
[REEF-910] Assemble entire driver configuration on driver instead of client
This addressed the issue by
* Adds the Bootstrap Launcher for YARN, which removes Java dependency if submitted through REST.
* Modifies DriverLauncher to accomodate other types of launchers.
* Moves YARN submission through .NET into the new two-step process.
JIRA:
[REEF-910](https://issues.apache.org/jira/browse/REEF-910)
This closes #631
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d5543395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d5543395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d5543395
Branch: refs/heads/master
Commit: d5543395691ca0baa8a879fb190531ecba56edce
Parents: fc5d907
Author: Andrew Chung <af...@gmail.com>
Authored: Wed Nov 11 17:38:51 2015 -0800
Committer: Julia Wang <ju...@apache.org>
Committed: Sun Nov 15 18:58:23 2015 -0800
----------------------------------------------------------------------
.../Org.Apache.REEF.Bridge/DriverLauncher.cpp | 11 +-
.../AvroYarnClusterJobSubmissionParameters.cs | 4 +-
.../YARN/AvroYarnJobSubmissionParameters.cs | 12 +-
.../Common/DriverFolderPreparationHelper.cs | 2 +-
lang/java/reef-bridge-client/pom.xml | 4 +
.../src/main/avro/JobSubmissionParameters.avsc | 5 +-
.../YarnBootstrapDriverConfigGenerator.java | 145 ++++++++++++
.../client/YarnBootstrapREEFLauncher.java | 71 ++++++
.../client/YarnClusterSubmissionFromCS.java | 202 +++++++++++++++++
.../YarnDriverConfigurationGenerator.java | 167 --------------
.../bridge/client/YarnJobSubmissionClient.java | 80 ++++---
...arnJobSubmissionParametersFileGenerator.java | 82 +++++++
.../bridge/client/YarnSubmissionFromCS.java | 219 -------------------
...SubmissionParametersSerializationFromCS.java | 164 ++++++++++++++
.../apache/reef/bridge/client/package-info.java | 22 ++
.../reef/runtime/common/REEFLauncher.java | 31 ---
.../runtime/common/files/REEFFileNames.java | 13 ++
.../common/launch/JavaLaunchCommandBuilder.java | 48 +++-
.../yarn/client/YarnSubmissionHelper.java | 36 ++-
19 files changed, 838 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp b/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
index d458ae4..f00244b 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/DriverLauncher.cpp
@@ -49,8 +49,9 @@ const int maxPathBufSize = 16 * 1024;
const char ClassPathSeparatorCharForWindows = ';';
// we look for this to delineate java vm arguments from app arguments.
-// todo: be smarter about this. Accomodate arbitrary apps.
-const char* launcherClass = "org.apache.reef.runtime.common.REEFLauncher";
+// TODO: be smarter about this. Accomodate arbitrary apps that doesn't
+// contain the term REEFLauncher.
+const char* launcherClassSearchStr = "REEFLauncher";
// method to invoke
const char* JavaMainMethodName = "main";
@@ -89,7 +90,7 @@ void GetCounts(
firstArgOrdinal = -1;
for (int i = firstOptionOrdinal; i < cArgs; i++) {
- if (option && 0 == strcmp(argv[i], launcherClass)) {
+ if (option && NULL != strstr(argv[i], launcherClassSearchStr)) {
option = false;
firstArgOrdinal = i;
}
@@ -100,6 +101,10 @@ void GetCounts(
++argCount;
}
}
+
+ if (firstArgOrdinal < 0) {
+ throw gcnew ArgumentException("Unable to find a REEF Launcher");
+ }
}
//
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
index f31874c..3d09212 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnClusterJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -70,6 +70,8 @@ namespace Org.Apache.REEF.Client.Avro.YARN
/// </summary>
public AvroYarnClusterJobSubmissionParameters()
{
+ this.securityTokenKind = "NULL";
+ this.securityTokenService = "NULL";
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
index a3be9b4..3e966ec 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -60,6 +60,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN
public int driverRecoveryTimeout { get; set; }
/// <summary>
+ /// Gets or sets the dfsJobSubmissionFolder field.
+ /// </summary>
+ [DataMember]
+ public string dfsJobSubmissionFolder { get; set; }
+
+ /// <summary>
/// Gets or sets the jobSubmissionDirectoryPrefix field.
/// </summary>
[DataMember]
@@ -78,12 +84,14 @@ namespace Org.Apache.REEF.Client.Avro.YARN
/// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
/// <param name="driverMemory">The driverMemory.</param>
/// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param>
+ /// <param name="dfsJobSubmissionFolder">The dfsJobSubmissionFolder.</param>
/// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param>
- public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string jobSubmissionDirectoryPrefix)
+ public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix)
{
this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
this.driverMemory = driverMemory;
this.driverRecoveryTimeout = driverRecoveryTimeout;
+ this.dfsJobSubmissionFolder = dfsJobSubmissionFolder;
this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
index 336685f..d422737 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
@@ -182,7 +182,7 @@ namespace Org.Apache.REEF.Client.Common
var lowerCasePath = fileName.ToLower();
return lowerCasePath.EndsWith(DLLFileNameExtension) ||
lowerCasePath.EndsWith(EXEFileNameExtension) ||
- lowerCasePath.StartsWith(ClientConstants.DriverJarFilePrefix);
+ lowerCasePath.StartsWith(ClientConstants.ClientJarFilePrefix);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index e0b31c0..9b1a1a6 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -98,6 +98,10 @@ under the License.
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 1bf830d..ef06e25 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -49,6 +49,7 @@
{ "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
{ "name": "driverMemory", "type": "int" },
{ "name": "driverRecoveryTimeout", "type": "int" },
+ { "name": "dfsJobSubmissionFolder", "type": ["null", "string"], "default" : null },
{ "name": "jobSubmissionDirectoryPrefix", "type": "string" }
]
},
@@ -60,8 +61,8 @@
"fields": [
{ "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" },
{ "name": "maxApplicationSubmissions", "type": "int" },
- { "name": "securityTokenKind", "type": "string" },
- { "name": "securityTokenService", "type": "string" }
+ { "name": "securityTokenKind", "type": "string", "default": "NULL" },
+ { "name": "securityTokenService", "type": "string", "default": "NULL" }
]
}
]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
new file mode 100644
index 0000000..23f86ff
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is the Java Driver configuration generator for .NET Drivers that generates
+ * the Driver configuration at runtime. Called by {@link YarnBootstrapREEFLauncher}.
+ */
+final class YarnBootstrapDriverConfigGenerator {
+ private static final Logger LOG = Logger.getLogger(YarnBootstrapDriverConfigGenerator.class.getName());
+
+ private final REEFFileNames reefFileNames;
+ private final ConfigurationSerializer configurationSerializer;
+
+ @Inject
+ private YarnBootstrapDriverConfigGenerator(final REEFFileNames reefFileNames,
+ final ConfigurationSerializer configurationSerializer) {
+ this.configurationSerializer = configurationSerializer;
+ this.reefFileNames = reefFileNames;
+ }
+
+ public String writeDriverConfigurationFile(final String bootstrapArgsLocation) throws IOException {
+ final File bootstrapArgsFile = new File(bootstrapArgsLocation);
+ final AvroYarnJobSubmissionParameters yarnBootstrapArgs =
+ readYarnJobSubmissionParametersFromFile(bootstrapArgsFile);
+ final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
+
+ this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapArgs),
+ new File(driverConfigPath));
+
+ return driverConfigPath;
+ }
+
+ static Configuration getYarnDriverConfiguration(
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams) {
+ final AvroJobSubmissionParameters jobSubmissionParameters =
+ yarnJobSubmissionParams.getSharedJobSubmissionParameters();
+ final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
+ .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(jobSubmissionParameters.getTcpBeginPort()))
+ .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(jobSubmissionParameters.getTcpRangeCount()))
+ .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(jobSubmissionParameters.getTcpTryCount()))
+ .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
+ yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
+ .build();
+
+ final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
+ yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString())
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .build();
+
+ final Configuration driverConfiguration = Configurations.merge(
+ Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, yarnDriverConfiguration, providerConfig);
+
+ if (yarnJobSubmissionParams.getDriverRecoveryTimeout() > 0) {
+ LOG.log(Level.FINE, "Driver restart is enabled.");
+
+ final Configuration yarnDriverRestartConfiguration =
+ YarnDriverRestartConfiguration.CONF.build();
+
+ final Configuration driverRestartConfiguration =
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+ yarnJobSubmissionParams.getDriverRecoveryTimeout())
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+ JobDriver.DriverRestartCompletedHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+ JobDriver.DriverRestartFailedEvaluatorHandler.class)
+ .build();
+
+ return Configurations.merge(driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
+ }
+
+ return driverConfiguration;
+ }
+
+ static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromFile(final File file)
+ throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+ // This is mainly a test hook.
+ return readYarnJobSubmissionParametersFromInputStream(fileInputStream);
+ }
+ }
+
+ static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromInputStream(
+ final InputStream inputStream) throws IOException {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
+ final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroYarnJobSubmissionParameters.class);
+ return reader.read(null, decoder);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
new file mode 100644
index 0000000..5f6b5c2
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.annotations.audience.Interop;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is a bootstrap launcher for YARN for submission from C#. It allows for Java Driver
+ * configuration generation directly on the Driver without need of Java dependency if REST
+ * submission is used. Note that the name of the class must contain "REEFLauncher" for the time
+ * being in order for the Interop code to discover the class.
+ */
+@Interop(CppFiles = "DriverLauncher.cpp")
+public final class YarnBootstrapREEFLauncher {
+ private static final Logger LOG = Logger.getLogger(YarnBootstrapREEFLauncher.class.getName());
+
+ public static void main(final String[] args) throws IOException, InjectionException {
+ LOG.log(Level.INFO, "Entering BootstrapLauncher.main().");
+
+ if (args.length != 1) {
+ final String message = "Bootstrap launcher should have a single configuration file input specifying the" +
+ " job submission parameters to be deserialized to create the YarnDriverConfiguration on the fly.";
+
+ throw fatal(message, new IllegalArgumentException(message));
+ }
+
+ try {
+ final YarnBootstrapDriverConfigGenerator yarnDriverConfigurationGenerator =
+ Tang.Factory.getTang().newInjector().getInstance(YarnBootstrapDriverConfigGenerator.class);
+ REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0])});
+ } catch (final Exception exception) {
+ if (!(exception instanceof RuntimeException)) {
+ throw fatal("Failed to initialize configurations.", exception);
+ }
+
+ throw exception;
+ }
+ }
+
+ private static RuntimeException fatal(final String msg, final Throwable t) {
+ LOG.log(Level.SEVERE, msg, t);
+ return new RuntimeException(msg, t);
+ }
+
+ private YarnBootstrapREEFLauncher(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
new file mode 100644
index 0000000..f0b5e6c
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang.Validate;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Represents a job submission from the CS code.
+ * <p>
+ * This class exists mostly to parse and validate the command line parameters provided by the C# class
+ * `Org.Apache.REEF.Client.YARN.YARNClient`
+ */
+final class YarnClusterSubmissionFromCS {
+ private static final int DEFAULT_PRIORITY = 1;
+ private static final String DEFAULT_QUEUE = "default";
+
+ private final File driverFolder;
+ private final String jobId;
+ private final int driverMemory;
+ private final int tcpBeginPort;
+ private final int tcpRangeCount;
+ private final int tcpTryCount;
+ private final int maxApplicationSubmissions;
+ private final int driverRecoveryTimeout;
+
+ // Static for now
+ private final int priority;
+ private final String queue;
+ private final String tokenKind;
+ private final String tokenService;
+ private final String jobSubmissionDirectoryPrefix;
+ private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
+
+ private YarnClusterSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
+ yarnJobSubmissionParameters = yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
+
+ final AvroJobSubmissionParameters jobSubmissionParameters =
+ yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
+ this.jobId = jobSubmissionParameters.getJobId().toString();
+ this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
+ this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
+ this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+ this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
+ this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
+ this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
+ this.priority = DEFAULT_PRIORITY;
+ this.queue = DEFAULT_QUEUE;
+ this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
+ this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString();
+ this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString();
+
+ Validate.notEmpty(jobId, "The job id is null or empty");
+ Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
+ Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
+ Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
+ Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
+ Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app submissions given is <= 0.");
+ Validate.notEmpty(queue, "The queue is null or empty");
+ Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value");
+ Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value");
+ Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty");
+ }
+
+ @Override
+ public String toString() {
+ return "YarnClusterSubmissionFromCS{" +
+ "driverFolder=" + driverFolder +
+ ", jobId='" + jobId + '\'' +
+ ", driverMemory=" + driverMemory +
+ ", tcpBeginPort=" + tcpBeginPort +
+ ", tcpRangeCount=" + tcpRangeCount +
+ ", tcpTryCount=" + tcpTryCount +
+ ", maxApplicationSubmissions=" + maxApplicationSubmissions +
+ ", driverRecoveryTimeout=" + driverRecoveryTimeout +
+ ", priority=" + priority +
+ ", queue='" + queue + '\'' +
+ ", tokenKind='" + tokenKind + '\'' +
+ ", tokenService='" + tokenService + '\'' +
+ ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
+ '}';
+ }
+
+ /**
+ * @return The local folder where the driver is staged.
+ */
+ File getDriverFolder() {
+ return driverFolder;
+ }
+
+ /**
+ * @return the id of the job to be submitted.
+ */
+ String getJobId() {
+ return jobId;
+ }
+
+ /**
+ * @return the amount of memory to allocate for the Driver, in MB.
+ */
+ int getDriverMemory() {
+ return driverMemory;
+ }
+
+ /**
+ * @return The priority of the job submission
+ */
+ int getPriority() {
+ return priority;
+ }
+
+ /**
+ * @return The queue the driver will be submitted to.
+ */
+ String getQueue() {
+ return queue;
+ }
+
+ /**
+ * @return The security token kind
+ */
+ String getTokenKind() {
+ return tokenKind;
+ }
+
+ /**
+ * @return The security token service
+ */
+ String getTokenService() {
+ return tokenService;
+ }
+
+ /**
+ * @return The max amount of times the application can be submitted.
+ */
+ int getMaxApplicationSubmissions(){
+ return maxApplicationSubmissions;
+ }
+
+ /**
+ * @return The time allowed for Driver recovery to recover all its Evaluators.
+ */
+ int getDriverRecoveryTimeout() {
+ return driverRecoveryTimeout;
+ }
+
+ /**
+ * @return The submission parameters for YARN jobs.
+ */
+ AvroYarnJobSubmissionParameters getYarnJobSubmissionParameters() {
+ return yarnJobSubmissionParameters;
+ }
+
+ /**
+ * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
+ */
+ static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
+ throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
+ // this is mainly a test hook
+ return readYarnClusterSubmissionFromCSFromInputStream(fileInputStream);
+ }
+ }
+
+ static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream(
+ final InputStream inputStream) throws IOException {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnClusterJobSubmissionParameters.getClassSchema(), inputStream);
+ final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroYarnClusterJobSubmissionParameters.class);
+ final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
+ return new YarnClusterSubmissionFromCS(yarnClusterJobSubmissionParameters);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
deleted file mode 100644
index 2c91b78..0000000
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.reef.client.DriverRestartConfiguration;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.javabridge.generic.JobDriver;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
-
-import javax.inject.Inject;
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Does client side manipulation of driver configuration for YARN runtime.
- */
-final class YarnDriverConfigurationGenerator {
- private static final Logger LOG = Logger.getLogger(YarnDriverConfigurationGenerator.class.getName());
- private final Set<ConfigurationProvider> configurationProviders;
- private final int driverRestartEvaluatorRecoverySeconds;
- private final REEFFileNames fileNames;
- private final ConfigurationSerializer configurationSerializer;
-
- @Inject
- private YarnDriverConfigurationGenerator(@Parameter(DriverConfigurationProviders.class)
- final Set<ConfigurationProvider> configurationProviders,
- @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
- final int driverRestartEvaluatorRecoverySeconds,
- final ConfigurationSerializer configurationSerializer,
- final REEFFileNames fileNames) {
- this.configurationProviders = configurationProviders;
- this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
- this.fileNames = fileNames;
- this.configurationSerializer = configurationSerializer;
- }
-
- /**
- * Writes driver configuration to disk.
- * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
- * @param jobId id of the job to be submitted
- * @param jobSubmissionFolder job submission folder on DFS
- * @return the prepared driver configuration
- * @throws IOException
- */
- public Configuration writeConfiguration(final File driverFolder,
- final String jobId,
- final String jobSubmissionFolder) throws IOException {
- final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
- final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
- .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
- .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
- .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
- .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
- .build();
-
- final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
- for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
- configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
- }
- final Configuration providedConfigurations = configurationBuilder.build();
-
- Configuration driverConfiguration = Configurations.merge(
- Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
- yarnDriverConfiguration,
- providedConfigurations);
-
- if (driverRestartEvaluatorRecoverySeconds > 0) {
- LOG.log(Level.FINE, "Driver restart is enabled.");
-
- final Configuration yarnDriverRestartConfiguration =
- YarnDriverRestartConfiguration.CONF.build();
-
- final Configuration driverRestartConfiguration =
- DriverRestartConfiguration.CONF
- .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
- JobDriver.DriverRestartActiveContextHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
- JobDriver.DriverRestartRunningTaskHandler.class)
- .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
- driverRestartEvaluatorRecoverySeconds)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
- JobDriver.DriverRestartCompletedHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
- JobDriver.DriverRestartFailedEvaluatorHandler.class)
- .build();
-
- driverConfiguration = Configurations.merge(
- driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
- }
-
- this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
- return driverConfiguration;
- }
-
- /**
- * This main is executed from .NET to perform driver config generation.
- * For arguments detail:
- * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
- */
- public static void main(final String[] args) throws InjectionException, IOException {
- final File jobSubmissionParametersFile = new File(args[0]);
- if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
- throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
- }
-
- final YarnSubmissionFromCS yarnSubmission =
- YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
-
- LOG.log(Level.INFO, "YARN driver config generation received from C#: {0}", yarnSubmission);
- final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
-
- final Injector injector = Tang.Factory.getTang().newInjector(yarnConfiguration);
- final YarnDriverConfigurationGenerator yarnClientConfigurationGenerator =
- injector.getInstance(YarnDriverConfigurationGenerator.class);
- final JobSubmissionDirectoryProvider directoryProvider = injector.getInstance(JobSubmissionDirectoryProvider.class);
- final String jobId = yarnSubmission.getJobId();
-
- LOG.log(Level.INFO, "Writing driver config for job {0}", jobId);
- yarnClientConfigurationGenerator.writeConfiguration(
- yarnSubmission.getDriverFolder(), jobId, directoryProvider.getJobSubmissionDirectoryPath(jobId).toString());
- System.exit(0);
- LOG.log(Level.INFO, "End of main in Java YarnDriverConfigurationGenerator");
- }
-}
-
-/**
- * How long the driver should wait before timing out on evaluator
- * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
- * enabled. Only used by .NET job submission.
- */
-@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
- " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
- " enabled. Only used by .NET job submission.", default_value = "-1")
-final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
- private SubmissionDriverRestartEvaluatorRecoverySeconds() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index b1d0c8b..93e3e92 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -18,6 +18,7 @@
*/
package org.apache.reef.bridge.client;
+import org.apache.commons.lang.Validate;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -27,21 +28,20 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
-import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.JARFileMaker;
import javax.inject.Inject;
@@ -49,6 +49,7 @@ import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,36 +62,29 @@ public final class YarnJobSubmissionClient {
private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
private final JobUploader uploader;
- private final ConfigurationSerializer configurationSerializer;
private final REEFFileNames fileNames;
private final YarnConfiguration yarnConfiguration;
private final ClasspathProvider classpath;
- private final int maxApplicationSubmissions;
private final SecurityTokenProvider tokenProvider;
private final List<String> commandPrefixList;
- private final YarnDriverConfigurationGenerator configurationGenerator;
+ private final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
@Inject
YarnJobSubmissionClient(final JobUploader uploader,
final YarnConfiguration yarnConfiguration,
- final ConfigurationSerializer configurationSerializer,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- @Parameter(MaxApplicationSubmissions.class)
- final int maxApplicationSubmissions,
@Parameter(DriverLaunchCommandPrefix.class)
final List<String> commandPrefixList,
final SecurityTokenProvider tokenProvider,
- final YarnDriverConfigurationGenerator configurationGenerator) {
+ final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
this.uploader = uploader;
- this.configurationSerializer = configurationSerializer;
this.fileNames = fileNames;
this.yarnConfiguration = yarnConfiguration;
this.classpath = classpath;
- this.maxApplicationSubmissions = maxApplicationSubmissions;
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
- this.configurationGenerator = configurationGenerator;
+ this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
}
/**
@@ -99,6 +93,7 @@ public final class YarnJobSubmissionClient {
* @throws IOException
*/
private File makeJar(final File driverFolder) throws IOException {
+ Validate.isTrue(driverFolder.exists());
final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
if (!reefFolder.isDirectory()) {
@@ -109,7 +104,7 @@ public final class YarnJobSubmissionClient {
return jarFile;
}
- private void launch(final YarnSubmissionFromCS yarnSubmission) throws IOException, YarnException {
+ private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
// ------------------------------------------------------------------------
// Get an application ID
try (final YarnSubmissionHelper submissionHelper =
@@ -118,10 +113,7 @@ public final class YarnJobSubmissionClient {
// ------------------------------------------------------------------------
// Prepare the JAR
final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
- final Configuration jobSubmissionConfiguration =
- this.configurationGenerator.writeConfiguration(yarnSubmission.getDriverFolder(),
- yarnSubmission.getJobId(),
- jobFolderOnDFS.getPath().toString());
+ this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS);
final File jarFile = makeJar(yarnSubmission.getDriverFolder());
LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
@@ -131,29 +123,26 @@ public final class YarnJobSubmissionClient {
final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile);
LOG.info("Uploaded job submission JAR");
- final Injector jobParamsInjector = Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
-
// ------------------------------------------------------------------------
// Submit
- try {
- submissionHelper
- .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
- .setApplicationName(yarnSubmission.getJobId())
- .setDriverMemory(yarnSubmission.getDriverMemory())
- .setPriority(yarnSubmission.getPriority())
- .setQueue(yarnSubmission.getQueue())
- .setMaxApplicationAttempts(this.maxApplicationSubmissions)
- .setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class))
- .submit();
- } catch (InjectionException ie) {
- throw new RuntimeException("Unable to submit job due to " + ie);
- }
+ submissionHelper
+ .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+ .setApplicationName(yarnSubmission.getJobId())
+ .setDriverMemory(yarnSubmission.getDriverMemory())
+ .setPriority(yarnSubmission.getPriority())
+ .setQueue(yarnSubmission.getQueue())
+ .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
+ .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
+ .setLauncherClass(YarnBootstrapREEFLauncher.class)
+ .setConfigurationFileName(fileNames.getYarnBootstrapParamFilePath())
+ .submit();
writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
}
}
- private static void writeSecurityTokenToUserCredential(final YarnSubmissionFromCS yarnSubmission) throws IOException {
+ private static void writeSecurityTokenToUserCredential(
+ final YarnClusterSubmissionFromCS yarnSubmission) throws IOException {
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
final REEFFileNames fileNames = new REEFFileNames();
final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile();
@@ -229,7 +218,7 @@ public final class YarnJobSubmissionClient {
/**
* .NET client calls into this main method for job submission.
* For arguments detail:
- * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
+ * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File)
*/
public static void main(final String[] args) throws InjectionException, IOException, YarnException {
final File jobSubmissionParametersFile = new File(args[0]);
@@ -237,8 +226,8 @@ public final class YarnJobSubmissionClient {
throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
}
- final YarnSubmissionFromCS yarnSubmission =
- YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+ final YarnClusterSubmissionFromCS yarnSubmission =
+ YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
@@ -250,11 +239,20 @@ public final class YarnJobSubmissionClient {
LOG.log(Level.FINE, "Did not find security token");
}
- final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
- final YarnJobSubmissionClient client = Tang.Factory.getTang()
- .newInjector(yarnConfiguration)
+ final List<String> launchCommandPrefix = new ArrayList<String>() {{
+ add(new REEFFileNames().getDriverLauncherExeFile().toString());
+ }};
+
+ final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix)
+ .build();
+ final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig)
.getInstance(YarnJobSubmissionClient.class);
+
client.launch(yarnSubmission);
+
LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient");
System.exit(0);
LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
new file mode 100644
index 0000000..ccb5e8b
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnJobSubmissionParametersFileGenerator {
+ private static final Logger LOG = Logger.getLogger(YarnJobSubmissionParametersFileGenerator.class.getName());
+ private final REEFFileNames fileNames;
+
+ @Inject
+ private YarnJobSubmissionParametersFileGenerator(final REEFFileNames fileNames) {
+ this.fileNames = fileNames;
+ }
+
+ /**
+ * Writes driver configuration to disk.
+ * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the
+ * YARN job for submission from the cluster.
+ * @throws IOException
+ */
+ public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+ final JobFolder jobFolderOnDFS) throws IOException {
+ final File yarnParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(),
+ fileNames.getYarnBootstrapParamFilePath());
+
+ try (final FileOutputStream fileOutputStream = new FileOutputStream(yarnParametersFile)) {
+ // this is mainly a test hook.
+ writeAvroYarnJobSubmissionParametersToOutputStream(
+ yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), fileOutputStream);
+ }
+ }
+
+ static void writeAvroYarnJobSubmissionParametersToOutputStream(
+ final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+ final String jobFolderOnDFSPath,
+ final OutputStream outputStream) throws IOException {
+ final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
+ new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
+
+ final AvroYarnJobSubmissionParameters jobSubmissionParameters =
+ yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
+ jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
+ final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
+ outputStream);
+ datumWriter.write(jobSubmissionParameters, encoder);
+ encoder.flush();
+ outputStream.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
deleted file mode 100644
index e8c5674..0000000
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.JsonDecoder;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang.Validate;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
-import org.apache.reef.io.TcpPortConfigurationProvider;
-import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents a job submission from the CS code.
- * <p>
- * This class exists mostly to parse and validate the command line parameters provided by the C# class
- * `Org.Apache.REEF.Client.YARN.YARNClient`
- */
-final class YarnSubmissionFromCS {
- private static final int DEFAULT_PRIORITY = 1;
- private static final String DEFAULT_QUEUE = "default";
-
- private final File driverFolder;
- private final String jobId;
- private final int driverMemory;
- private final int tcpBeginPort;
- private final int tcpRangeCount;
- private final int tcpTryCount;
- private final int maxApplicationSubmissions;
- private final int driverRecoveryTimeout;
-
- // Static for now
- private final int priority;
- private final String queue;
- private final String tokenKind;
- private final String tokenService;
- private final String jobSubmissionDirectoryPrefix;
-
- private YarnSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
- final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters =
- yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
-
- final AvroJobSubmissionParameters jobSubmissionParameters =
- yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
-
- this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
- this.jobId = jobSubmissionParameters.getJobId().toString();
- this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
- this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
- this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
- this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
- this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
- this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
- this.priority = DEFAULT_PRIORITY;
- this.queue = DEFAULT_QUEUE;
- this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
- this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString();
- this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString();
-
- Validate.isTrue(driverFolder.exists(), "The driver folder given does not exist.");
- Validate.notEmpty(jobId, "The job id is null or empty");
- Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
- Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
- Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
- Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
- Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app submissions given is <= 0.");
- Validate.notEmpty(queue, "The queue is null or empty");
- Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value");
- Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value");
- Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty");
- }
-
- @Override
- public String toString() {
- return "YarnSubmissionFromCS{" +
- "driverFolder=" + driverFolder +
- ", jobId='" + jobId + '\'' +
- ", driverMemory=" + driverMemory +
- ", tcpBeginPort=" + tcpBeginPort +
- ", tcpRangeCount=" + tcpRangeCount +
- ", tcpTryCount=" + tcpTryCount +
- ", maxApplicationSubmissions=" + maxApplicationSubmissions +
- ", driverRecoveryTimeout=" + driverRecoveryTimeout +
- ", priority=" + priority +
- ", queue='" + queue + '\'' +
- ", tokenKind='" + tokenKind + '\'' +
- ", tokenService='" + tokenService + '\'' +
- ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
- '}';
- }
-
- /**
- * Produces the YARN Runtime Configuration based on the parameters passed from C#.
- *
- * @return the YARN Runtime Configuration based on the parameters passed from C#.
- */
- Configuration getRuntimeConfiguration() {
- final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
- .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
- .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort))
- .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount))
- .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount))
- .bindNamedParameter(JobSubmissionDirectoryPrefix.class, jobSubmissionDirectoryPrefix)
- .build();
-
- final List<String> driverLaunchCommandPrefixList = new ArrayList<>();
- driverLaunchCommandPrefixList.add(new REEFFileNames().getDriverLauncherExeFile().toString());
-
- final Configuration yarnJobSubmissionClientParamsConfig = Tang.Factory.getTang().newConfigurationBuilder()
- .bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
- Integer.toString(driverRecoveryTimeout))
- .bindNamedParameter(MaxApplicationSubmissions.class, Integer.toString(maxApplicationSubmissions))
- .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList)
- .build();
-
- return Configurations.merge(YarnClientConfiguration.CONF.build(), providerConfig,
- yarnJobSubmissionClientParamsConfig);
- }
-
- /**
- * @return The local folder where the driver is staged.
- */
- File getDriverFolder() {
- return driverFolder;
- }
-
- /**
- * @return the id of the job to be submitted.
- */
- String getJobId() {
- return jobId;
- }
-
- /**
- * @return the amount of memory to allocate for the Driver, in MB.
- */
- int getDriverMemory() {
- return driverMemory;
- }
-
- /**
- * @return The priority of the job submission
- */
- int getPriority() {
- return priority;
- }
-
- /**
- * @return The queue the driver will be submitted to.
- */
- String getQueue() {
- return queue;
- }
-
- /**
- * @return The security token kind
- */
- String getTokenKind() {
- return tokenKind;
- }
-
- /**
- * @return The security token service
- */
- String getTokenService() {
- return tokenService;
- }
-
- /**
- * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
- */
- static YarnSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
- throws IOException {
- try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
- final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
- AvroYarnClusterJobSubmissionParameters.getClassSchema(), fileInputStream);
- final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
- AvroYarnClusterJobSubmissionParameters.class);
- final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
-
- return new YarnSubmissionFromCS(yarnClusterJobSubmissionParameters);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
new file mode 100644
index 0000000..ba930d4
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Tests for generating Driver configuration by bootstrapping the process to the
+ * {@link YarnBootstrapREEFLauncher}. Tests the Avro job submission serialization
+ * deserialization from C# using mocked, expected Strings.
+ */
+public final class TestAvroJobSubmissionParametersSerializationFromCS {
+ private static final String NULL_REP = "NULL";
+ private static final String STRING_REP = "HelloREEF";
+ private static final String STRING_REP_QUOTED = "\"" + STRING_REP + "\"";
+ private static final long NUMBER_REP = 12345;
+ private static final String AVRO_YARN_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"sharedJobSubmissionParameters\":" +
+ "{" +
+ "\"jobId\":" + STRING_REP_QUOTED + "," +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP + "," +
+ "\"jobSubmissionFolder\":" + STRING_REP_QUOTED +
+ "}," +
+ "\"driverMemory\":" + NUMBER_REP + "," +
+ "\"driverRecoveryTimeout\":" + NUMBER_REP + "," +
+ "\"dfsJobSubmissionFolder\":" +
+ "{" +
+ "\"string\": " + STRING_REP_QUOTED +
+ "}," +
+ "\"jobSubmissionDirectoryPrefix\":" + STRING_REP_QUOTED +
+ "}";
+
+ private static final String AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"yarnJobSubmissionParameters\":" + AVRO_YARN_PARAMETERS_SERIALIZED_STRING + "," +
+ "\"maxApplicationSubmissions\":" + NUMBER_REP + "," +
+ "\"securityTokenKind\":\"" + NULL_REP + "\"," +
+ "\"securityTokenService\":\"" + NULL_REP + "\"" +
+ "}";
+
+ /**
+ * Tests deserialization of the Avro parameters for submission from the cluster from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroYarnClusterParametersDeserialization() throws IOException {
+ final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS = createYarnClusterSubmissionFromCS();
+ assert yarnClusterSubmissionFromCS.getDriverFolder().equals(new File(STRING_REP));
+ assert yarnClusterSubmissionFromCS.getDriverMemory() == NUMBER_REP;
+ assert yarnClusterSubmissionFromCS.getDriverRecoveryTimeout() == NUMBER_REP;
+ assert yarnClusterSubmissionFromCS.getJobId().equals(STRING_REP);
+ assert yarnClusterSubmissionFromCS.getMaxApplicationSubmissions() == NUMBER_REP;
+ assert yarnClusterSubmissionFromCS.getTokenKind().equals(NULL_REP);
+ assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP);
+
+ verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters());
+ }
+
+ /**
+ * Tests deserialization of the Avro parameters for YARN from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroYarnParametersDeserialization() throws IOException {
+ verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters());
+ }
+
+ /**
+ * Tests a round-trip serialization deserialization process of the Avro parameters from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroYarnParametersSerialization() throws IOException {
+ try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ YarnJobSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream(
+ createYarnClusterSubmissionFromCS(), STRING_REP, outputStream);
+ final byte[] content = outputStream.toByteArray();
+ try (final InputStream stream = new ByteArrayInputStream(content)) {
+ verifyYarnJobSubmissionParams(
+ YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream));
+ }
+ }
+ }
+
+ /**
+ * Tests that the DriverConfiguration is bound with Avro parameters from C#.
+ * @throws IOException
+ * @throws InjectionException
+ */
+ @Test
+ public void testYarnBootstrapDriverConfigGenerator() throws IOException, InjectionException {
+ final Configuration yarnBootstrapDriverConfig =
+ YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration(createAvroYarnJobSubmissionParameters());
+ final Injector injector = Tang.Factory.getTang().newInjector(yarnBootstrapDriverConfig);
+
+ assert injector.getNamedInstance(JobSubmissionDirectory.class).equals(STRING_REP);
+ assert injector.getNamedInstance(JobSubmissionDirectoryPrefix.class).equals(STRING_REP);
+ assert injector.getNamedInstance(JobIdentifier.class).equals(STRING_REP);
+ assert injector.getNamedInstance(TcpPortRangeBegin.class) == NUMBER_REP;
+ assert injector.getNamedInstance(TcpPortRangeCount.class) == NUMBER_REP;
+ assert injector.getNamedInstance(TcpPortRangeTryCount.class) == NUMBER_REP;
+ }
+
+ private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(AVRO_YARN_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ return YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream);
+ }
+ }
+
+ private static YarnClusterSubmissionFromCS createYarnClusterSubmissionFromCS() throws IOException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(
+ AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(stream);
+ }
+ }
+
+ private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters) {
+ final AvroJobSubmissionParameters sharedJobSubmissionParams =
+ jobSubmissionParameters.getSharedJobSubmissionParameters();
+ assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
+ assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
+ assert sharedJobSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+ assert sharedJobSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+ assert sharedJobSubmissionParams.getTcpTryCount() == NUMBER_REP;
+ assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
new file mode 100644
index 0000000..263d4cc
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for the bridge client.
+ */
+package org.apache.reef.bridge.client;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 878e9b2..55452a4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -43,7 +43,6 @@ import javax.inject.Inject;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -191,36 +190,6 @@ public final class REEFLauncher {
}
}
- /**
- * Pass values of the properties specified in the propNames array as <code>-D...</code>
- * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
- *
- * @param vargs List of command line parameters to append to.
- * @param copyNull create an empty parameter if the property is missing in current process.
- * @param propNames property names.
- */
- public static void propagateProperties(
- final Collection<String> vargs, final boolean copyNull, final String... propNames) {
- for (final String propName : propNames) {
- final String propValue = System.getProperty(propName);
- if (propValue == null || propValue.isEmpty()) {
- if (copyNull) {
- vargs.add("-D" + propName);
- }
- } else {
- vargs.add(String.format("-D%s=%s", propName, propValue));
- }
- }
- }
-
- /**
- * Same as above, but with copyNull == false by default.
- */
- public static void propagateProperties(
- final Collection<String> vargs, final String... propNames) {
- propagateProperties(vargs, false, propNames);
- }
-
private void logVersion() {
this.reefVersion.logVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
index be25102..5de19c4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -51,6 +51,7 @@ public final class REEFFileNames {
private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
private static final String SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
private static final String SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
+ private static final String YARN_BOOTSTRAP_PARAM_FILE = "yarnparameters.json";
@Inject
public REEFFileNames() {
@@ -231,4 +232,16 @@ public final class REEFFileNames {
public String getSecurityTokenPasswordFile() {
return SECURITY_TOKEN_PASSWORD_FILE;
}
+
+ /**
+ * @return File name the contains the bootstrap parameters for YARN job submission
+ * without Java dependency.
+ */
+ public String getYarnBootstrapParamFile() {
+ return YARN_BOOTSTRAP_PARAM_FILE;
+ }
+
+ public String getYarnBootstrapParamFilePath() {
+ return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapParamFile();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d5543395/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
index 9484249..e6f74ab 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -46,23 +46,35 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
private String javaPath = null;
private String classPath = null;
private Boolean assertionsEnabled = null;
- private Map<String, JVMOption> options = new HashMap<>();
+ private final Map<String, JVMOption> options = new HashMap<>();
private final List<String> commandPrefixList;
+ private final Class launcherClass;
/**
- * Constructor that populates default options.
+ * Constructor that populates default options, using the default Launcher
+ * class {@link REEFLauncher}.
*/
public JavaLaunchCommandBuilder() {
- this(null);
+ this(REEFLauncher.class, null);
}
/**
- * Constructor that populates prefix.
+ * Constructor that uses the default Launcher class, {@link REEFLauncher}.
+ * @param commandPrefixList
*/
public JavaLaunchCommandBuilder(final List<String> commandPrefixList) {
+ this(REEFLauncher.class, commandPrefixList);
+ }
+
+ /**
+ * Constructor that populates prefix and uses a custom Launcher class.
+ */
+ public JavaLaunchCommandBuilder(final Class launcherClass, final List<String> commandPrefixList) {
for (final String defaultOption : DEFAULT_OPTIONS) {
addOption(defaultOption);
}
+
+ this.launcherClass = launcherClass;
this.commandPrefixList = commandPrefixList;
}
@@ -95,11 +107,11 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
add(classPath);
}
- REEFLauncher.propagateProperties(this, true, "proc_reef");
- REEFLauncher.propagateProperties(this, false,
+ propagateProperties(this, true, "proc_reef");
+ propagateProperties(this, false,
"java.util.logging.config.file", "java.util.logging.config.class");
- add(REEFLauncher.class.getName());
+ add(launcherClass.getName());
add(evaluatorConfigurationPath);
if (stdoutPath != null && !stdoutPath.isEmpty()) {
@@ -168,6 +180,28 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
return addOption(JVMOption.parse(option));
}
+ /**
+ * Pass values of the properties specified in the propNames array as <code>-D...</code>
+ * command line parameters. Currently used only to pass logging configuration to child JVMs processes.
+ *
+ * @param vargs List of command line parameters to append to.
+ * @param copyNull create an empty parameter if the property is missing in current process.
+ * @param propNames property names.
+ */
+ private static void propagateProperties(
+ final Collection<String> vargs, final boolean copyNull, final String... propNames) {
+ for (final String propName : propNames) {
+ final String propValue = System.getProperty(propName);
+ if (propValue == null || propValue.isEmpty()) {
+ if (copyNull) {
+ vargs.add("-D" + propName);
+ }
+ } else {
+ vargs.add(String.format("-D%s=%s", propName, propValue));
+ }
+ }
+ }
+
private JavaLaunchCommandBuilder addOption(final JVMOption jvmOption) {
if (options.containsKey(jvmOption.option)) {
LOG.warning("Replaced option " + options.get(jvmOption.option) + " with " + jvmOption);