You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2016/02/10 20:27:20 UTC
[1/3] reef git commit: [REEF-1180] Split REEFClient submission
parameter file into per-application and per-job
Repository: reef
Updated Branches:
refs/heads/master 20369d4d4 -> 1137cdee0
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
index c79fe41..c77caf7 100644
--- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -18,7 +18,9 @@
*/
package org.apache.reef.bridge.client;
+import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
@@ -45,30 +47,42 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
private static final String STRING_REP = "HelloREEF";
private static final String STRING_REP_QUOTED = "\"" + STRING_REP + "\"";
private static final long NUMBER_REP = 12345;
- private static final String AVRO_YARN_PARAMETERS_SERIALIZED_STRING =
+ private static final String AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING =
"{" +
"\"sharedJobSubmissionParameters\":" +
"{" +
"\"jobId\":" + STRING_REP_QUOTED + "," +
- "\"tcpBeginPort\":" + NUMBER_REP + "," +
- "\"tcpRangeCount\":" + NUMBER_REP + "," +
- "\"tcpTryCount\":" + NUMBER_REP + "," +
"\"jobSubmissionFolder\":" + STRING_REP_QUOTED +
"}," +
- "\"driverMemory\":" + NUMBER_REP + "," +
- "\"driverRecoveryTimeout\":" + NUMBER_REP + "," +
"\"dfsJobSubmissionFolder\":\"" + STRING_REP + "\"," +
"\"jobSubmissionDirectoryPrefix\":" + STRING_REP_QUOTED +
"}";
- private static final String AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING =
+ private static final String AVRO_YARN_CLUSTER_JOB_PARAMETERS_SERIALIZED_STRING =
"{" +
- "\"yarnJobSubmissionParameters\":" + AVRO_YARN_PARAMETERS_SERIALIZED_STRING + "," +
- "\"maxApplicationSubmissions\":" + NUMBER_REP + "," +
+ "\"yarnJobSubmissionParameters\":" + AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING + "," +
"\"securityTokenKind\":\"" + NULL_REP + "\"," +
"\"securityTokenService\":\"" + NULL_REP + "\"" +
"}";
+ private static final String AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"driverMemory\":" + NUMBER_REP + "," +
+ "\"driverRecoveryTimeout\":" + NUMBER_REP +
+ "}";
+
+ private static final String AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"yarnAppSubmissionParameters\":" + AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING + "," +
+ "\"maxApplicationSubmissions\":" + NUMBER_REP +
+ "}";
+
/**
* Tests deserialization of the Avro parameters for submission from the cluster from C#.
* @throws IOException
@@ -84,7 +98,8 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
assert yarnClusterSubmissionFromCS.getTokenKind().equals(NULL_REP);
assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP);
- verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters());
+ verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(),
+ yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters());
}
/**
@@ -93,7 +108,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
*/
@Test
public void testAvroYarnParametersDeserialization() throws IOException {
- verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters());
+ verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters(), createAvroYarnAppSubmissionParameters());
}
/**
@@ -102,13 +117,24 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
*/
@Test
public void testAvroYarnParametersSerialization() throws IOException {
- try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- YarnJobSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream(
- createYarnClusterSubmissionFromCS(), STRING_REP, outputStream);
- final byte[] content = outputStream.toByteArray();
- try (final InputStream stream = new ByteArrayInputStream(content)) {
- verifyYarnJobSubmissionParams(
- YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream));
+ try (final ByteArrayOutputStream appOutputStream = new ByteArrayOutputStream()) {
+ try (final ByteArrayOutputStream jobOutputStream = new ByteArrayOutputStream()) {
+ final YarnClusterSubmissionFromCS clusterSubmissionFromCS = createYarnClusterSubmissionFromCS();
+ YarnSubmissionParametersFileGenerator.writeAvroYarnAppSubmissionParametersToOutputStream(
+ clusterSubmissionFromCS, appOutputStream);
+ YarnSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream(
+ clusterSubmissionFromCS, STRING_REP, jobOutputStream);
+
+ final byte[] jobContent = jobOutputStream.toByteArray();
+ final byte[] appContent = appOutputStream.toByteArray();
+
+ try (final InputStream appStream = new ByteArrayInputStream(appContent)) {
+ try (final InputStream jobStream = new ByteArrayInputStream(jobContent)) {
+ verifyYarnJobSubmissionParams(
+ YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(jobStream),
+ YarnBootstrapDriverConfigGenerator.readYarnAppSubmissionParametersFromInputStream(appStream));
+ }
+ }
}
}
}
@@ -121,7 +147,8 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
@Test
public void testYarnBootstrapDriverConfigGenerator() throws IOException, InjectionException {
final Configuration yarnBootstrapDriverConfig =
- YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration(createAvroYarnJobSubmissionParameters());
+ YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration(
+ createAvroYarnJobSubmissionParameters(), createAvroYarnAppSubmissionParameters());
final Injector injector = Tang.Factory.getTang().newInjector(yarnBootstrapDriverConfig);
assert injector.getNamedInstance(JobSubmissionDirectory.class).equals(STRING_REP);
@@ -132,29 +159,45 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
assert injector.getNamedInstance(TcpPortRangeTryCount.class) == NUMBER_REP;
}
+ private static AvroYarnAppSubmissionParameters createAvroYarnAppSubmissionParameters() throws IOException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ return YarnBootstrapDriverConfigGenerator.readYarnAppSubmissionParametersFromInputStream(stream);
+ }
+ }
+
private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException {
try (final InputStream stream =
- new ByteArrayInputStream(AVRO_YARN_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ new ByteArrayInputStream(AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
return YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream);
}
}
private static YarnClusterSubmissionFromCS createYarnClusterSubmissionFromCS() throws IOException {
- try (final InputStream stream =
+ try (final InputStream appStream =
new ByteArrayInputStream(
- AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
- return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(stream);
+ AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ try (final InputStream jobStream =
+ new ByteArrayInputStream(
+ AVRO_YARN_CLUSTER_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(appStream, jobStream);
+ }
}
}
- private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters) {
+ private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters,
+ final AvroYarnAppSubmissionParameters appSubmissionParameters) {
+ final AvroAppSubmissionParameters sharedAppSubmissionParams =
+ appSubmissionParameters.getSharedAppSubmissionParameters();
+
final AvroJobSubmissionParameters sharedJobSubmissionParams =
jobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP;
assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
- assert sharedJobSubmissionParams.getTcpBeginPort() == NUMBER_REP;
- assert sharedJobSubmissionParams.getTcpRangeCount() == NUMBER_REP;
- assert sharedJobSubmissionParams.getTcpTryCount() == NUMBER_REP;
assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java
index 2faff66..1219503 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java
@@ -20,6 +20,7 @@ package org.apache.reef.driver.evaluator;
import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import java.util.Collections;
import java.util.List;
/**
@@ -60,7 +61,7 @@ public final class CLRProcess implements EvaluatorProcess {
@Override
public CLRProcess setConfigurationFileName(final String configurationFileName) {
- commandBuilder.setConfigurationFileName(configurationFileName);
+ commandBuilder.setConfigurationFilePaths(Collections.singletonList(configurationFileName));
return this;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java
index 98f4d14..9b4a599 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java
@@ -22,6 +22,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.RuntimePathProvider;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import java.util.Collections;
import java.util.List;
/**
@@ -72,7 +73,7 @@ public final class JVMProcess implements EvaluatorProcess {
@Override
public JVMProcess setConfigurationFileName(final String configurationFileName) {
- commandBuilder.setConfigurationFileName(configurationFileName);
+ commandBuilder.setConfigurationFilePaths(Collections.singletonList(configurationFileName));
return this;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
index 5de19c4..a51ad1c 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -51,7 +51,8 @@ public final class REEFFileNames {
private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
private static final String SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
private static final String SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
- private static final String YARN_BOOTSTRAP_PARAM_FILE = "yarnparameters.json";
+ private static final String YARN_BOOTSTRAP_APP_PARAM_FILE = "yarn-app-parameters.json";
+ private static final String YARN_BOOTSTRAP_JOB_PARAM_FILE = "yarn-job-parameters.json";
@Inject
public REEFFileNames() {
@@ -234,14 +235,32 @@ public final class REEFFileNames {
}
/**
- * @return File name the contains the bootstrap parameters for YARN job submission
+ * @return File name the contains the bootstrap application parameters for YARN job submission
* without Java dependency.
*/
- public String getYarnBootstrapParamFile() {
- return YARN_BOOTSTRAP_PARAM_FILE;
+ public String getYarnBootstrapAppParamFile() {
+ return YARN_BOOTSTRAP_APP_PARAM_FILE;
}
- public String getYarnBootstrapParamFilePath() {
- return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapParamFile();
+ /**
+ * @return File name the contains the bootstrap job parameters for YARN job submission
+ * without Java dependency.
+ */
+ public String getYarnBootstrapJobParamFile() {
+ return YARN_BOOTSTRAP_JOB_PARAM_FILE;
+ }
+
+ /**
+ * @return Path to the bootstrap application parameters file for YARN job submission without Java dependency.
+ */
+ public String getYarnBootstrapAppParamFilePath() {
+ return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapAppParamFile();
+ }
+
+ /**
+ * @return Path to the bootstrap job parameters file for YARN job submission without Java dependency.
+ */
+ public String getYarnBootstrapJobParamFilePath() {
+ return getYarnBootstrapJobParamFile();
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
index 09c6646..9ad9c60 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
@@ -19,6 +19,7 @@
package org.apache.reef.runtime.common.launch;
import org.apache.commons.lang.StringUtils;
+import org.apache.reef.util.Optional;
import java.io.File;
import java.util.LinkedList;
@@ -33,11 +34,10 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder {
private static final Logger LOG = Logger.getLogger(CLRLaunchCommandBuilder.class.getName());
private static final String EVALUATOR_PATH = "reef/global/Org.Apache.Reef.Evaluator.exe";
-
private String standardErrPath = null;
private String standardOutPath = null;
private int megaBytes = 0;
- private String evaluatorConfigurationPath = null;
+ private Optional<List<String>> evaluatorConfigurationPaths = Optional.empty();
@Override
public List<String> build() {
@@ -47,7 +47,11 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder {
LOG.log(Level.WARNING, "file can NOT be found: {0}", f.getAbsolutePath());
}
result.add(f.getPath());
- result.add(evaluatorConfigurationPath);
+ if (evaluatorConfigurationPaths.isPresent()) {
+ for (final String evaluatorConfigurationPath : evaluatorConfigurationPaths.get()) {
+ result.add(evaluatorConfigurationPath);
+ }
+ }
if (null != this.standardOutPath && !standardOutPath.isEmpty()) {
result.add(">" + this.standardOutPath);
}
@@ -66,8 +70,8 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder {
}
@Override
- public CLRLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
- this.evaluatorConfigurationPath = configurationFileName;
+ public CLRLaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationFilePaths) {
+ this.evaluatorConfigurationPaths = Optional.of(configurationFilePaths);
return this;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
index e6f74ab..bf77ee4 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.launch;
import org.apache.commons.lang.StringUtils;
import org.apache.reef.runtime.common.REEFLauncher;
import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.Optional;
import java.io.File;
import java.util.ArrayList;
@@ -42,7 +43,7 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
private static final String[] DEFAULT_OPTIONS = {"-XX:PermSize=128m", "-XX:MaxPermSize=128m"};
private String stderrPath = null;
private String stdoutPath = null;
- private String evaluatorConfigurationPath = null;
+ private Optional<List<String>> evaluatorConfigurationPaths = Optional.empty();
private String javaPath = null;
private String classPath = null;
private Boolean assertionsEnabled = null;
@@ -112,7 +113,11 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
"java.util.logging.config.file", "java.util.logging.config.class");
add(launcherClass.getName());
- add(evaluatorConfigurationPath);
+ if (evaluatorConfigurationPaths.isPresent()) {
+ for (final String configurationPath : evaluatorConfigurationPaths.get()) {
+ add(configurationPath);
+ }
+ }
if (stdoutPath != null && !stdoutPath.isEmpty()) {
add("1>");
@@ -133,8 +138,8 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
}
@Override
- public JavaLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
- this.evaluatorConfigurationPath = configurationFileName;
+ public JavaLaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationPaths) {
+ this.evaluatorConfigurationPaths = Optional.of(configurationPaths);
return this;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
index 0768298..396c9c8 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
@@ -42,10 +42,10 @@ public interface LaunchCommandBuilder {
* Set the name of the configuration file for the Launcher. This file is assumed to exist in the working directory of
* the process launched with this command line.
*
- * @param configurationFileName
+ * @param configurationFilePaths
* @return this
*/
- LaunchCommandBuilder setConfigurationFileName(final String configurationFileName);
+ LaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationFilePaths);
/**
* Names a file to which stdout will be redirected.
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java
index cfe7872..fbcf820 100644
--- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java
+++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.launch;
import org.junit.Test;
+import java.util.Collections;
import java.util.List;
import static org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder.JVMOption;
@@ -167,6 +168,7 @@ public final class JavaLaunchCommandBuilderTest {
}
private static JavaLaunchCommandBuilder newBuilder() {
- return new JavaLaunchCommandBuilder().setConfigurationFileName("mockConfigurationFileName");
+ return new JavaLaunchCommandBuilder()
+ .setConfigurationFilePaths(Collections.singletonList("mockConfigurationFileName"));
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index dfab9c2..8fd0594 100644
--- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -37,6 +37,7 @@ import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -149,7 +150,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
return new JavaLaunchCommandBuilder()
.setJavaPath("%JAVA_HOME%/bin/java")
- .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
+ .setConfigurationFilePaths(Collections.singletonList(this.filenames.getDriverConfigurationPath()))
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(jobSubmissionEvent.getDriverMemory().get())
.setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java
index c53dc7e..e324cea 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java
@@ -28,6 +28,7 @@ import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.io.File;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
@@ -91,7 +92,7 @@ public class PreparedDriverFolderLauncher {
private List<String> makeLaunchCommand(final String jobId, final String clientRemoteId) {
final List<String> command = new JavaLaunchCommandBuilder(commandPrefixList)
- .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ .setConfigurationFilePaths(Collections.singletonList(this.fileNames.getDriverConfigurationPath()))
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(DRIVER_MEMORY)
.build();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index 5c0a3f8..880d30c 100644
--- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -40,6 +40,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -133,7 +134,7 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler {
this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile);
final List<String> launchCommand = new JavaLaunchCommandBuilder()
- .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ .setConfigurationFilePaths(Collections.singletonList(this.fileNames.getDriverConfigurationPath()))
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(jobSubmissionEvent.getDriverMemory().get())
.build();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index f006e83..faf8e1f 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.yarn.client;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.annotations.audience.ClientSide;
@@ -104,7 +105,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
: this.uploader.createJobFolder(submissionHelper.getApplicationId());
final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath());
final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration);
- final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile);
+ final LocalResource driverJarOnDfs =
+ jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile, LocalResourceType.ARCHIVE);
submissionHelper
.addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index c424fbc..0847f4d 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -34,9 +34,7 @@ import org.apache.reef.runtime.yarn.util.YarnTypes;
import java.io.Closeable;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -57,7 +55,7 @@ public final class YarnSubmissionHelper implements Closeable{
private final SecurityTokenProvider tokenProvider;
private final List<String> commandPrefixList;
private Class launcherClazz;
- private String confFileName;
+ private List<String> configurationFilePaths;
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
@@ -81,7 +79,7 @@ public final class YarnSubmissionHelper implements Closeable{
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
this.launcherClazz = REEFLauncher.class;
- this.confFileName = this.fileNames.getDriverConfigurationPath();
+ this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath());
LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
}
@@ -92,12 +90,10 @@ public final class YarnSubmissionHelper implements Closeable{
this(yarnConfiguration, fileNames, classpath, tokenProvider, null);
}
-
-
- /**
- *
- * @return the application ID assigned by YARN.
- */
+ /**
+ *
+ * @return the application ID assigned by YARN.
+ */
public int getApplicationId() {
return this.applicationId.getId();
}
@@ -214,21 +210,22 @@ public final class YarnSubmissionHelper implements Closeable{
/**
* Sets the configuration file for the job.
- * Note that this does not have to be the Driver TANG configuration. In the bootstrap
- * launch case, this can be the Avro file that supports the generation of a driver
+ * Note that this does not have to be Driver TANG configuration. In the bootstrap
+ * launch case, this can be the set of the Avro files that supports the generation of a driver
* configuration file natively at the Launcher.
- * @param configurationFileName
+ * @param configurationFilePaths
* @return
*/
- public YarnSubmissionHelper setConfigurationFileName(final String configurationFileName) {
- this.confFileName = configurationFileName;
+ public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) {
+ this.configurationFilePaths = configurationFilePaths;
return this;
}
public void submit() throws IOException, YarnException {
+
// SET EXEC COMMAND
final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList)
- .setConfigurationFileName(confFileName)
+ .setConfigurationFilePaths(configurationFilePaths)
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(this.applicationSubmissionContext.getResource().getMemory())
.setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName())
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
index a8b2c05..6de0561 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
@@ -88,18 +88,18 @@ public final class JobFolder {
* @return
* @throws IOException
*/
- public LocalResource uploadAsLocalResource(final File localFile) throws IOException {
+ public LocalResource uploadAsLocalResource(final File localFile, final LocalResourceType type) throws IOException {
final Path p = upload(localFile);
- return getLocalResourceForPath(p);
+ return getLocalResourceForPath(p, type);
}
/**
* Creates a LocalResource instance for the JAR file referenced by the given Path.
*/
- public LocalResource getLocalResourceForPath(final Path jarPath) throws IOException {
+ public LocalResource getLocalResourceForPath(final Path jarPath, final LocalResourceType type) throws IOException {
final LocalResource localResource = Records.newRecord(LocalResource.class);
final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(jarPath);
- localResource.setType(LocalResourceType.ARCHIVE);
+ localResource.setType(type);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
localResource.setTimestamp(status.getModificationTime());
[2/3] reef git commit: [REEF-1180] Split REEFClient submission
parameter file into per-application and per-job
Posted by af...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
index bcff6d2..83e5ff1 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
@@ -16,11 +16,15 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.IO;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.IO.FileSystem;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Client.YARN.RestClient
@@ -36,41 +40,72 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator;
private readonly IFileSystem _fileSystem;
+ private readonly REEFFileNames _reefFileNames;
+ private readonly IFile _file;
[Inject]
private FileSystemJobResourceUploader(
IResourceArchiveFileGenerator resourceArchiveFileGenerator,
- IFileSystem fileSystem)
+ IFileSystem fileSystem,
+ REEFFileNames reefFileNames,
+ IFile file)
{
_fileSystem = fileSystem;
_resourceArchiveFileGenerator = resourceArchiveFileGenerator;
+ _reefFileNames = reefFileNames;
+ _file = file;
}
- public JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory)
+ public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath)
{
driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
- var driverUploadPath = jobSubmissionDirectory.TrimEnd('/') + @"/";
+ var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
+ var parentDirectoryUri = _fileSystem.CreateUriForPath(remoteUploadDirectoryPath);
Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
- var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+
+ _fileSystem.CreateDirectory(parentDirectoryUri);
- var destinationPath = driverUploadPath + Path.GetFileName(archivePath);
- var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
- Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, remoteFileUri);
+ var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+ return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath);
+ }
+ public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath)
+ {
+ var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath);
+
_fileSystem.CreateDirectory(parentDirectoryUri);
- _fileSystem.CopyFromLocal(archivePath, remoteFileUri);
+ return GetJobResource(fileLocalPath, ResourceType.FILE, remoteUploadDirectoryPath);
+ }
+
+ private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath)
+ {
+ if (!_file.Exists(filePath))
+ {
+ Exceptions.Throw(
+ new FileNotFoundException("Could not find resource file " + filePath),
+ Log);
+ }
+
+ var destinationPath = driverUploadPath + Path.GetFileName(filePath);
+ var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
+
+ Log.Log(Level.Verbose, @"Copy {0} to {1}", filePath, remoteFileUri);
+
+ _fileSystem.CopyFromLocal(filePath, remoteFileUri);
var fileStatus = _fileSystem.GetFileStatus(remoteFileUri);
return new JobResource
{
+ Name = Path.GetFileNameWithoutExtension(filePath),
LastModificationUnixTimestamp = DateTimeToUnixTimestamp(fileStatus.ModificationTime),
RemoteUploadPath = remoteFileUri.AbsoluteUri,
- ResourceSize = fileStatus.LengthBytes
+ ResourceSize = fileStatus.LengthBytes,
+ ResourceType = resourceType
};
}
- private long DateTimeToUnixTimestamp(DateTime dateTime)
+ private static long DateTimeToUnixTimestamp(DateTime dateTime)
{
return (long)(dateTime - Epoch).TotalSeconds;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
index 6a14c5c..21e3f7d 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
@@ -91,11 +91,12 @@ namespace Org.Apache.REEF.Client.YARN
}
sb.Append(" " + LauncherClassName);
+ sb.Append(" " + _fileNames.GetJobSubmissionParametersFile());
sb.Append(" " +
string.Format("{0}/{1}/{2}",
_fileNames.GetReefFolderName(),
_fileNames.GetLocalFolderName(),
- _fileNames.GetJobSubmissionParametersFile()));
+ _fileNames.GetAppSubmissionParametersFile()));
sb.Append(" " + _fileNames.GetDriverLoggingConfigCommand());
return sb.ToString();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
index ce228ef..81912ea 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
@@ -21,20 +21,14 @@ using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Avro;
-using Org.Apache.REEF.Client.Avro.YARN;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn.RestClient;
using Org.Apache.REEF.Client.YARN;
-using Org.Apache.REEF.Client.YARN.Parameters;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
-using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
-using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.Yarn
{
@@ -48,11 +42,9 @@ namespace Org.Apache.REEF.Client.Yarn
private static readonly Logger Logger = Logger.GetLogger(typeof(YarnREEFClient));
private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper;
private readonly IJavaClientLauncher _javaClientLauncher;
- private readonly string _securityTokenKind;
- private readonly string _securityTokenService;
- private readonly string _jobSubmissionPrefix;
private readonly REEFFileNames _fileNames;
private readonly IYarnRMClient _yarnClient;
+ private readonly YarnREEFParamSerializer _paramSerializer;
[Inject]
internal YarnREEFClient(IJavaClientLauncher javaClientLauncher,
@@ -60,18 +52,14 @@ namespace Org.Apache.REEF.Client.Yarn
REEFFileNames fileNames,
YarnCommandLineEnvironment yarn,
IYarnRMClient yarnClient,
- [Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind,
- [Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService,
- [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix)
+ YarnREEFParamSerializer paramSerializer)
{
- _jobSubmissionPrefix = jobSubmissionPrefix;
- _securityTokenKind = securityTokenKind;
- _securityTokenService = securityTokenService;
_javaClientLauncher = javaClientLauncher;
_javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
_driverFolderPreparationHelper = driverFolderPreparationHelper;
_fileNames = fileNames;
_yarnClient = yarnClient;
+ _paramSerializer = paramSerializer;
}
public void Submit(IJobSubmission jobSubmission)
@@ -123,41 +111,12 @@ namespace Org.Apache.REEF.Client.Yarn
// TODO: Remove this when we have a generalized way to pass config to java
var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
-
- var avroJobSubmissionParameters = new AvroJobSubmissionParameters
- {
- jobId = jobSubmission.JobIdentifier,
- tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
- tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
- tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
- jobSubmissionFolder = driverFolderPath
- };
-
- var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters
- {
- driverMemory = jobSubmission.DriverMemory,
- driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(),
- jobSubmissionDirectoryPrefix = _jobSubmissionPrefix,
- sharedJobSubmissionParameters = avroJobSubmissionParameters
- };
-
- var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters
- {
- maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>(),
- securityTokenKind = _securityTokenKind,
- securityTokenService = _securityTokenService,
- yarnJobSubmissionParameters = avroYarnJobSubmissionParameters
- };
-
- var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile());
- using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
- {
- var serializedArgs = AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters);
- argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
- }
+
+ var submissionJobArgsFilePath = _paramSerializer.SerializeJobFile(jobSubmission, driverFolderPath);
+ var submissionAppArgsFilePath = _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, driverFolderPath);
// Submit the driver
- _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath);
+ _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath);
Logger.Log(Level.Info, "Submitted the Driver for execution." + jobSubmission.JobIdentifier);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
index 89dad2b..33ab5b9 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
@@ -16,24 +16,21 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Avro;
-using Org.Apache.REEF.Client.Avro.YARN;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn;
using Org.Apache.REEF.Client.Yarn.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
-using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.YARN
{
@@ -53,6 +50,7 @@ namespace Org.Apache.REEF.Client.YARN
private readonly IYarnJobCommandProvider _yarnJobCommandProvider;
private readonly REEFFileNames _fileNames;
private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider;
+ private readonly YarnREEFDotNetParamSerializer _paramSerializer;
[Inject]
private YarnREEFDotNetClient(
@@ -61,7 +59,8 @@ namespace Org.Apache.REEF.Client.YARN
IJobResourceUploader jobResourceUploader,
IYarnJobCommandProvider yarnJobCommandProvider,
REEFFileNames fileNames,
- IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider)
+ IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider,
+ YarnREEFDotNetParamSerializer paramSerializer)
{
_jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider;
_fileNames = fileNames;
@@ -69,6 +68,7 @@ namespace Org.Apache.REEF.Client.YARN
_jobResourceUploader = jobResourceUploader;
_driverFolderPreparationHelper = driverFolderPreparationHelper;
_yarnRMClient = yarnRMClient;
+ _paramSerializer = paramSerializer;
}
public void Submit(IJobSubmission jobSubmission)
@@ -94,40 +94,21 @@ namespace Org.Apache.REEF.Client.YARN
// prepare configuration
var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
- int maxApplicationSubmissions =
+ var maxApplicationSubmissions =
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>();
- var avroJobSubmissionParameters = new AvroJobSubmissionParameters
- {
- jobId = jobSubmission.JobIdentifier,
- tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
- tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
- tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
- jobSubmissionFolder = localDriverFolderPath
- };
-
- var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters
- {
- driverMemory = jobSubmission.DriverMemory,
- driverRecoveryTimeout =
- paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(),
- jobSubmissionDirectoryPrefix = jobSubmissionDirectory,
- dfsJobSubmissionFolder = jobSubmissionDirectory,
- sharedJobSubmissionParameters = avroJobSubmissionParameters
- };
-
- var submissionArgsFilePath = Path.Combine(localDriverFolderPath,
- _fileNames.GetLocalFolderPath(),
- _fileNames.GetJobSubmissionParametersFile());
- using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
- {
- var serializedArgs =
- AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters);
- argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
- }
+ _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, localDriverFolderPath);
+ _paramSerializer.SerializeJobFile(jobSubmission, localDriverFolderPath, jobSubmissionDirectory);
+
+ var archiveResource = _jobResourceUploader.UploadArchiveResource(localDriverFolderPath, jobSubmissionDirectory);
+
+ // Path to the job args file.
+ var jobArgsFilePath = Path.Combine(localDriverFolderPath, _fileNames.GetJobSubmissionParametersFile());
+
+ var argFileResource = _jobResourceUploader.UploadFileResource(jobArgsFilePath, jobSubmissionDirectory);
// upload prepared folder to DFS
- var jobResource = _jobResourceUploader.UploadJobResource(localDriverFolderPath, jobSubmissionDirectory);
+ var jobResources = new List<JobResource> { archiveResource, argFileResource };
// submit job
Log.Log(Level.Verbose, @"Assigned application id {0}", applicationId);
@@ -135,7 +116,7 @@ namespace Org.Apache.REEF.Client.YARN
var submissionReq = CreateApplicationSubmissionRequest(jobSubmission,
applicationId,
maxApplicationSubmissions,
- jobResource);
+ jobResources);
var submittedApplication = _yarnRMClient.SubmitApplicationAsync(submissionReq).GetAwaiter().GetResult();
Log.Log(Level.Info, @"Submitted application {0}", submittedApplication.Id);
}
@@ -163,13 +144,16 @@ namespace Org.Apache.REEF.Client.YARN
IJobSubmission jobSubmission,
string appId,
int maxApplicationSubmissions,
- JobResource jobResource)
+ IReadOnlyCollection<JobResource> jobResources)
{
string command = _yarnJobCommandProvider.GetJobSubmissionCommand();
Log.Log(Level.Verbose, "Command for YARN: {0}", command);
Log.Log(Level.Verbose, "ApplicationID: {0}", appId);
Log.Log(Level.Verbose, "MaxApplicationSubmissions: {0}", maxApplicationSubmissions);
- Log.Log(Level.Verbose, "Driver archive location: {0}", jobResource.RemoteUploadPath);
+ foreach (var jobResource in jobResources)
+ {
+ Log.Log(Level.Verbose, "Remote file: {0}", jobResource.RemoteUploadPath);
+ }
var submitApplication = new SubmitApplication
{
@@ -188,24 +172,7 @@ namespace Org.Apache.REEF.Client.YARN
UnmanagedAM = false,
AmContainerSpec = new AmContainerSpec
{
- LocalResources = new LocalResources
- {
- Entries = new[]
- {
- new KeyValuePair<string, LocalResourcesValue>
- {
- Key = _fileNames.GetReefFolderName(),
- Value = new LocalResourcesValue
- {
- Resource = jobResource.RemoteUploadPath,
- Type = ResourceType.ARCHIVE,
- Visibility = Visibility.APPLICATION,
- Size = jobResource.ResourceSize,
- Timestamp = jobResource.LastModificationUnixTimestamp
- }
- }
- }
- },
+ LocalResources = CreateLocalResources(jobResources),
Commands = new Commands
{
Command = command
@@ -216,11 +183,30 @@ namespace Org.Apache.REEF.Client.YARN
return submitApplication;
}
+ private static LocalResources CreateLocalResources(IEnumerable<JobResource> jobResources)
+ {
+ return new LocalResources
+ {
+ Entries = jobResources.Select(jobResource => new RestClient.DataModel.KeyValuePair<string, LocalResourcesValue>
+ {
+ Key = jobResource.Name,
+ Value = new LocalResourcesValue
+ {
+ Resource = jobResource.RemoteUploadPath,
+ Type = jobResource.ResourceType,
+ Visibility = Visibility.APPLICATION,
+ Size = jobResource.ResourceSize,
+ Timestamp = jobResource.LastModificationUnixTimestamp
+ }
+ }).ToArray()
+ };
+ }
+
/// <summary>
/// Creates the temporary directory to hold the job submission.
/// </summary>
/// <returns>The path to the folder created.</returns>
- private string CreateDriverFolder(string jobId, string appId)
+ private static string CreateDriverFolder(string jobId, string appId)
{
return Path.GetFullPath(Path.Combine(Path.GetTempPath(), string.Join("-", "reef", jobId, appId)));
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
new file mode 100644
index 0000000..dd28702
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.YARN;
+using Org.Apache.REEF.Common.Avro;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ /// <summary>
+ /// Job/application parameter file serializer for <see cref="YarnREEFDotNetClient"/>.
+ /// </summary>
+ internal sealed class YarnREEFDotNetParamSerializer
+ {
+ private readonly REEFFileNames _fileNames;
+
+ [Inject]
+ private YarnREEFDotNetParamSerializer(REEFFileNames fileNames)
+ {
+ _fileNames = fileNames;
+ }
+
+ /// <summary>
+ /// Serializes the application parameters to reef/local/app-submission-params.json.
+ /// </summary>
+ internal void SerializeAppFile(IJobSubmission jobSubmission, IInjector paramInjector, string localDriverFolderPath)
+ {
+ var serializedArgs = SerializeAppArgsToBytes(jobSubmission, paramInjector, localDriverFolderPath);
+
+ var submissionAppArgsFilePath = Path.Combine(
+ localDriverFolderPath, _fileNames.GetLocalFolderPath(), _fileNames.GetAppSubmissionParametersFile());
+
+ using (var jobArgsFileStream = new FileStream(submissionAppArgsFilePath, FileMode.CreateNew))
+ {
+ jobArgsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+ }
+
+ internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, IInjector paramInjector, string localDriverFolderPath)
+ {
+ var avroAppSubmissionParameters = new AvroAppSubmissionParameters
+ {
+ tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+ tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+ tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>()
+ };
+
+ var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters
+ {
+ sharedAppSubmissionParameters = avroAppSubmissionParameters,
+ driverMemory = jobSubmission.DriverMemory,
+ driverRecoveryTimeout =
+ paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(),
+ };
+
+ return AvroJsonSerializer<AvroYarnAppSubmissionParameters>.ToBytes(avroYarnAppSubmissionParameters);
+ }
+
+ /// <summary>
+ /// Serializes the job parameters to job-submission-params.json.
+ /// </summary>
+ internal void SerializeJobFile(IJobSubmission jobSubmission, string localDriverFolderPath, string jobSubmissionDirectory)
+ {
+ var serializedArgs = SerializeJobArgsToBytes(jobSubmission, localDriverFolderPath, jobSubmissionDirectory);
+
+ var submissionJobArgsFilePath = Path.Combine(localDriverFolderPath,
+ _fileNames.GetJobSubmissionParametersFile());
+
+ using (var jobArgsFileStream = new FileStream(submissionJobArgsFilePath, FileMode.CreateNew))
+ {
+ jobArgsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+ }
+
+ internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, string localDriverFolderPath, string jobSubmissionDirectory)
+ {
+ var avroJobSubmissionParameters = new AvroJobSubmissionParameters
+ {
+ jobId = jobSubmission.JobIdentifier,
+ jobSubmissionFolder = localDriverFolderPath
+ };
+
+ var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters
+ {
+ jobSubmissionDirectoryPrefix = jobSubmissionDirectory,
+ dfsJobSubmissionFolder = jobSubmissionDirectory,
+ sharedJobSubmissionParameters = avroJobSubmissionParameters
+ };
+
+ return AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
new file mode 100644
index 0000000..e2278e4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.YARN;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.Parameters;
+using Org.Apache.REEF.Common.Avro;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ /// <summary>
+ /// Job/application parameter file serializer for <see cref="YarnREEFClient"/>.
+ /// </summary>
+ internal sealed class YarnREEFParamSerializer
+ {
+ private readonly REEFFileNames _fileNames;
+ private readonly string _securityTokenKind;
+ private readonly string _securityTokenService;
+ private readonly string _jobSubmissionPrefix;
+
+ [Inject]
+ private YarnREEFParamSerializer(
+ REEFFileNames fileNames,
+ [Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind,
+ [Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService,
+ [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix)
+ {
+ _fileNames = fileNames;
+ _jobSubmissionPrefix = jobSubmissionPrefix;
+ _securityTokenKind = securityTokenKind;
+ _securityTokenService = securityTokenService;
+ }
+
+ /// <summary>
+ /// Serializes the application parameters to reef/local/app-submission-params.json.
+ /// </summary>
+ internal string SerializeAppFile(IJobSubmission jobSubmission, IInjector paramInjector, string driverFolderPath)
+ {
+ var serializedArgs = SerializeAppArgsToBytes(jobSubmission, paramInjector);
+
+ var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetAppSubmissionParametersFile());
+ using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
+ {
+ argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+
+ return submissionArgsFilePath;
+ }
+
+ internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, IInjector paramInjector)
+ {
+ var avroAppSubmissionParameters = new AvroAppSubmissionParameters
+ {
+ tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+ tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+ tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>()
+ };
+
+ var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters
+ {
+ sharedAppSubmissionParameters = avroAppSubmissionParameters,
+ driverMemory = jobSubmission.DriverMemory,
+ driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>()
+ };
+
+ var avroYarnClusterAppSubmissionParameters = new AvroYarnClusterAppSubmissionParameters
+ {
+ yarnAppSubmissionParameters = avroYarnAppSubmissionParameters,
+ maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>()
+ };
+
+ return AvroJsonSerializer<AvroYarnClusterAppSubmissionParameters>.ToBytes(avroYarnClusterAppSubmissionParameters);
+ }
+
+ /// <summary>
+ /// Serializes the job parameters to job-submission-params.json.
+ /// </summary>
+ internal string SerializeJobFile(IJobSubmission jobSubmission, string driverFolderPath)
+ {
+ var serializedArgs = SerializeJobArgsToBytes(jobSubmission, driverFolderPath);
+
+ var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile());
+ using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
+ {
+ argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+
+ return submissionArgsFilePath;
+ }
+
+ internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, string driverFolderPath)
+ {
+ var avroJobSubmissionParameters = new AvroJobSubmissionParameters
+ {
+ jobId = jobSubmission.JobIdentifier,
+ jobSubmissionFolder = driverFolderPath
+ };
+
+ var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters
+ {
+ jobSubmissionDirectoryPrefix = _jobSubmissionPrefix,
+ sharedJobSubmissionParameters = avroJobSubmissionParameters
+ };
+
+ var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters
+ {
+ securityTokenKind = _securityTokenKind,
+ securityTokenService = _securityTokenService,
+ yarnJobSubmissionParameters = avroYarnJobSubmissionParameters
+ };
+
+ return AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 321342b..94e2e6b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -53,6 +53,7 @@ namespace Org.Apache.REEF.Common.Files
private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config";
private const string SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
private const string SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
+ private const string APP_SUBMISSION_PARAMETERS_FILE = "app-submission-params.json";
private const string JOB_SUBMISSION_PARAMETERS_FILE = "job-submission-params.json";
private const string DRIVER_COMMAND_LOGGING_CONFIG = "1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
@@ -243,7 +244,16 @@ namespace Org.Apache.REEF.Common.Files
}
/// <summary>
- /// The Job Submission parameters file that is used to submit a job through Java,
+ /// The Job Submission application parameters file that is used to submit a job through Java,
+ /// either directly or via a "bootstrap" method.
+ /// </summary>
+ public string GetAppSubmissionParametersFile()
+ {
+ return APP_SUBMISSION_PARAMETERS_FILE;
+ }
+
+ /// <summary>
+ /// The Job Submission job parameters file that is used to submit a job through Java,
/// either directly or via a "bootstrap" method.
/// </summary>
public string GetJobSubmissionParametersFile()
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
new file mode 100644
index 0000000..30205e1
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroAppSubmissionParameters",
+ "doc": "General cross-language application submission parameters shared by all runtimes",
+ "fields": [
+ { "name": "tcpBeginPort", "type": "int" },
+ { "name": "tcpRangeCount", "type": "int" },
+ { "name": "tcpTryCount", "type": "int" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroLocalAppSubmissionParameters",
+ "doc": "Cross-language application submission parameters to the Local runtime",
+ "fields": [
+ { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" },
+ { "name": "maxNumberOfConcurrentEvaluators", "type": "int" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroYarnAppSubmissionParameters",
+ "doc": "General cross-language application submission parameters to the YARN runtime",
+ "fields": [
+ { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" },
+ { "name": "driverMemory", "type": "int" },
+ { "name": "driverRecoveryTimeout", "type": "int" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroYarnClusterAppSubmissionParameters",
+ "doc": "Cross-language application submission parameters to the YARN runtime using Hadoop's submission client",
+ "fields": [
+ { "name": "yarnAppSubmissionParameters", "type": "AvroYarnAppSubmissionParameters" },
+ { "name": "maxApplicationSubmissions", "type": "int" }
+ ]
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index fec2431..61b9812 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -21,34 +21,19 @@
"namespace": "org.apache.reef.reef.bridge.client.avro",
"type": "record",
"name": "AvroJobSubmissionParameters",
- "doc": "General cross-language submission parameters shared by all runtimes",
+ "doc": "General cross-language job submission parameters shared by all runtimes",
"fields": [
{ "name": "jobId", "type": "string" },
- { "name": "tcpBeginPort", "type": "int" },
- { "name": "tcpRangeCount", "type": "int" },
- { "name": "tcpTryCount", "type": "int" },
{ "name": "jobSubmissionFolder", "type": "string" }
]
},
{
"namespace": "org.apache.reef.reef.bridge.client.avro",
"type": "record",
- "name": "AvroLocalJobSubmissionParameters",
- "doc": "Cross-language submission parameters to the Local runtime",
- "fields": [
- { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
- { "name": "maxNumberOfConcurrentEvaluators", "type": "int" }
- ]
- },
- {
- "namespace": "org.apache.reef.reef.bridge.client.avro",
- "type": "record",
"name": "AvroYarnJobSubmissionParameters",
"doc": "General cross-language submission parameters to the YARN runtime",
"fields": [
{ "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
- { "name": "driverMemory", "type": "int" },
- { "name": "driverRecoveryTimeout", "type": "int" },
{ "name": "dfsJobSubmissionFolder", "type": "string", "default": "NULL" },
{ "name": "jobSubmissionDirectoryPrefix", "type": "string" }
]
@@ -60,7 +45,6 @@
"doc": "Cross-language submission parameters to the YARN runtime using Hadoop's submission client",
"fields": [
{ "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" },
- { "name": "maxApplicationSubmissions", "type": "int" },
{ "name": "securityTokenKind", "type": "string", "default": "NULL" },
{ "name": "securityTokenService", "type": "string", "default": "NULL" }
]
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
index 6ad77c7..e03013a 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
@@ -20,6 +20,7 @@ package org.apache.reef.bridge.client;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
import org.apache.reef.runtime.yarn.YarnClasspathProvider;
@@ -44,16 +45,18 @@ public final class JobResourceUploader {
/**
* This class is invoked from Org.Apache.REEF.Client.Yarn.LegacyJobResourceUploader in .NET code.
* Arguments:
- * [0] : Local path for already generated archive
- * [1] : Path of job submission directory
- * [2] : File path for output with details of uploaded resource
+ * [0] : Local path for file.
+ * [1] : Type for file.
+ * [2] : Path of job submission directory
+ * [3] : File path for output with details of uploaded resource
*/
public static void main(final String[] args) throws InjectionException, IOException {
- Validate.isTrue(args.length == 3, "Job resource uploader requires 3 args");
+ Validate.isTrue(args.length == 4, "Job resource uploader requires 4 args");
final File localFile = new File(args[0]);
- Validate.isTrue(localFile.exists(), "Local archive does not exist " + localFile.getAbsolutePath());
- final String jobSubmissionDirectory = args[1];
- final String localOutputPath = args[2];
+ Validate.isTrue(localFile.exists(), "Local file does not exist " + localFile.getAbsolutePath());
+ final String fileType = args[1];
+ final String jobSubmissionDirectory = args[2];
+ final String localOutputPath = args[3];
LOG.log(Level.INFO, "Received args: LocalPath " + localFile.getAbsolutePath() + " Submission directory " +
jobSubmissionDirectory + " LocalOutputPath " + localOutputPath);
@@ -66,7 +69,7 @@ public final class JobResourceUploader {
.newInjector(configuration)
.getInstance(JobUploader.class);
final LocalResource localResource = jobUploader.createJobFolder(jobSubmissionDirectory)
- .uploadAsLocalResource(localFile);
+ .uploadAsLocalResource(localFile, LocalResourceType.valueOf(fileType));
// Output: <UploadedPath>;<LastModificationUnixTimestamp>;<ResourceSize>
final URL resource = localResource.getResource();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index cc308ab..3c94acc 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -61,12 +61,19 @@ public final class LocalClient {
public static void main(final String[] args) throws IOException, InjectionException {
final File jobSubmissionParametersFile = new File(args[0]);
+ final File localAppSubmissionParametersFile = new File(args[1]);
+
if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
}
+ if (!(localAppSubmissionParametersFile.exists() && localAppSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + localAppSubmissionParametersFile.getAbsolutePath());
+ }
+
final LocalSubmissionFromCS localSubmissionFromCS =
- LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+ LocalSubmissionFromCS.fromSubmissionParameterFiles(
+ jobSubmissionParametersFile, localAppSubmissionParametersFile);
LOG.log(Level.INFO, "Local job submission received from C#: {0}", localSubmissionFromCS);
final Configuration runtimeConfiguration = localSubmissionFromCS.getRuntimeConfiguration();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
index 93a8f2c..b63e20b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -92,13 +92,20 @@ final class LocalRuntimeDriverConfigurationGenerator {
}
public static void main(final String[] args) throws InjectionException, IOException {
- final File jobSubmissionParametersFile = new File(args[0]);
+ final File localAppSubmissionParametersFile = new File(args[0]);
+ final File jobSubmissionParametersFile = new File(args[1]);
+
if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
}
+ if (!(localAppSubmissionParametersFile.exists() && localAppSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + localAppSubmissionParametersFile.getAbsolutePath());
+ }
+
final LocalSubmissionFromCS localSubmission =
- LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+ LocalSubmissionFromCS.fromSubmissionParameterFiles(
+ jobSubmissionParametersFile, localAppSubmissionParametersFile);
LOG.log(Level.FINE, "Local driver config generation received from C#: {0}", localSubmission);
final Configuration localRuntimeConfiguration = localSubmission.getRuntimeConfiguration();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
index e8d8118..cc5bb82 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
@@ -24,8 +24,9 @@ import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.Validate;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
@@ -58,16 +59,18 @@ final class LocalSubmissionFromCS {
private final int tcpRangeCount;
private final int tcpTryCount;
- private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters avroLocalJobSubmissionParameters) {
+ private LocalSubmissionFromCS(final AvroJobSubmissionParameters avroJobSubmissionParameters,
+ final AvroLocalAppSubmissionParameters avroLocalAppSubmissionParameters) {
// We assume the given path to be the one of the driver. The job folder is one level up from there.
- final AvroJobSubmissionParameters jobSubmissionParameters =
- avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters();
- this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
- this.jobId = jobSubmissionParameters.getJobId().toString();
- this.maxNumberOfConcurrentEvaluators = avroLocalJobSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
- this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
- this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
- this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+ final AvroAppSubmissionParameters appSubmissionParameters =
+ avroLocalAppSubmissionParameters.getSharedAppSubmissionParameters();
+ this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort();
+ this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount();
+ this.tcpTryCount = appSubmissionParameters.getTcpTryCount();
+ this.maxNumberOfConcurrentEvaluators = avroLocalAppSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
+
+ this.driverFolder = new File(avroJobSubmissionParameters.getJobSubmissionFolder().toString());
+ this.jobId = avroJobSubmissionParameters.getJobId().toString();
this.jobFolder = driverFolder.getParentFile();
this.runtimeRootFolder = jobFolder.getParentFile();
@@ -137,16 +140,29 @@ final class LocalSubmissionFromCS {
/**
* Takes the local job submission configuration file, deserializes it, and creates submission object.
*/
- static LocalSubmissionFromCS fromJobSubmissionParametersFile(final File localJobSubmissionParametersFile)
+ static LocalSubmissionFromCS fromSubmissionParameterFiles(final File jobSubmissionParametersFile,
+ final File localAppSubmissionParametersFile)
throws IOException {
- try (final FileInputStream fileInputStream = new FileInputStream(localJobSubmissionParametersFile)) {
+ final AvroLocalAppSubmissionParameters localAppSubmissionParameters;
+
+ final AvroJobSubmissionParameters jobSubmissionParameters;
+
+ try (final FileInputStream fileInputStream = new FileInputStream(jobSubmissionParametersFile)) {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
- AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream);
- final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader =
- new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class);
- final AvroLocalJobSubmissionParameters localJobSubmissionParameters = reader.read(null, decoder);
+ AvroJobSubmissionParameters.getClassSchema(), fileInputStream);
+ final SpecificDatumReader<AvroJobSubmissionParameters> reader =
+ new SpecificDatumReader<>(AvroJobSubmissionParameters.class);
+ jobSubmissionParameters = reader.read(null, decoder);
+ }
- return new LocalSubmissionFromCS(localJobSubmissionParameters);
+ try (final FileInputStream fileInputStream = new FileInputStream(localAppSubmissionParametersFile)) {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroLocalAppSubmissionParameters.getClassSchema(), fileInputStream);
+ final SpecificDatumReader<AvroLocalAppSubmissionParameters> reader =
+ new SpecificDatumReader<>(AvroLocalAppSubmissionParameters.class);
+ localAppSubmissionParameters = reader.read(null, decoder);
}
+
+ return new LocalSubmissionFromCS(jobSubmissionParameters, localAppSubmissionParameters);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
index 23f86ff..c2c5fd3 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
@@ -25,7 +25,9 @@ import org.apache.reef.client.DriverRestartConfiguration;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.io.TcpPortConfigurationProvider;
import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -63,31 +65,31 @@ final class YarnBootstrapDriverConfigGenerator {
this.reefFileNames = reefFileNames;
}
- public String writeDriverConfigurationFile(final String bootstrapArgsLocation) throws IOException {
- final File bootstrapArgsFile = new File(bootstrapArgsLocation);
- final AvroYarnJobSubmissionParameters yarnBootstrapArgs =
- readYarnJobSubmissionParametersFromFile(bootstrapArgsFile);
+ public String writeDriverConfigurationFile(final String bootstrapJobArgsLocation,
+ final String bootstrapAppArgsLocation) throws IOException {
+ final File bootstrapJobArgsFile = new File(bootstrapJobArgsLocation).getCanonicalFile();
+ final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation);
+
+ final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs =
+ readYarnJobSubmissionParametersFromFile(bootstrapJobArgsFile);
+
+ final AvroYarnAppSubmissionParameters yarnBootstrapAppArgs =
+ readYarnAppSubmissionParametersFromFile(bootstrapAppArgsFile);
+
final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
- this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapArgs),
+ this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapJobArgs, yarnBootstrapAppArgs),
new File(driverConfigPath));
return driverConfigPath;
}
static Configuration getYarnDriverConfiguration(
- final AvroYarnJobSubmissionParameters yarnJobSubmissionParams) {
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+ final AvroYarnAppSubmissionParameters yarnAppSubmissionParams) {
+
final AvroJobSubmissionParameters jobSubmissionParameters =
yarnJobSubmissionParams.getSharedJobSubmissionParameters();
- final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
- .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
- .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(jobSubmissionParameters.getTcpBeginPort()))
- .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(jobSubmissionParameters.getTcpRangeCount()))
- .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(jobSubmissionParameters.getTcpTryCount()))
- .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
- yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
- .build();
-
final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
.set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
@@ -96,10 +98,21 @@ final class YarnBootstrapDriverConfigGenerator {
.set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
.build();
+ final AvroAppSubmissionParameters appSubmissionParams = yarnAppSubmissionParams.getSharedAppSubmissionParameters();
+
+ final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
+ .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(appSubmissionParams.getTcpBeginPort()))
+ .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(appSubmissionParams.getTcpRangeCount()))
+ .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount()))
+ .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
+ yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
+ .build();
+
final Configuration driverConfiguration = Configurations.merge(
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, yarnDriverConfiguration, providerConfig);
- if (yarnJobSubmissionParams.getDriverRecoveryTimeout() > 0) {
+ if (yarnAppSubmissionParams.getDriverRecoveryTimeout() > 0) {
LOG.log(Level.FINE, "Driver restart is enabled.");
final Configuration yarnDriverRestartConfiguration =
@@ -113,7 +126,7 @@ final class YarnBootstrapDriverConfigGenerator {
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
JobDriver.DriverRestartRunningTaskHandler.class)
.set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
- yarnJobSubmissionParams.getDriverRecoveryTimeout())
+ yarnAppSubmissionParams.getDriverRecoveryTimeout())
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
JobDriver.DriverRestartCompletedHandler.class)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
@@ -126,6 +139,23 @@ final class YarnBootstrapDriverConfigGenerator {
return driverConfiguration;
}
+ static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromFile(final File file)
+ throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+ // This is mainly a test hook.
+ return readYarnAppSubmissionParametersFromInputStream(fileInputStream);
+ }
+ }
+
+ static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromInputStream(
+ final InputStream inputStream) throws IOException {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnAppSubmissionParameters.getClassSchema(), inputStream);
+ final SpecificDatumReader<AvroYarnAppSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroYarnAppSubmissionParameters.class);
+ return reader.read(null, decoder);
+ }
+
static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromFile(final File file)
throws IOException {
try (final FileInputStream fileInputStream = new FileInputStream(file)) {
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
index 5f6b5c2..223c853 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
@@ -41,9 +41,20 @@ public final class YarnBootstrapREEFLauncher {
public static void main(final String[] args) throws IOException, InjectionException {
LOG.log(Level.INFO, "Entering BootstrapLauncher.main().");
- if (args.length != 1) {
- final String message = "Bootstrap launcher should have a single configuration file input specifying the" +
- " job submission parameters to be deserialized to create the YarnDriverConfiguration on the fly.";
+ if (args.length != 2) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("[ ");
+ for (String arg : args) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+
+ sb.append("]");
+
+ final String message = "Bootstrap launcher should have two configuration file inputs, one specifying the" +
+ " application submission parameters to be deserialized and the other specifying the job" +
+ " submission parameters to be deserialized to create the YarnDriverConfiguration on the fly." +
+ " Current args are " + sb.toString();
throw fatal(message, new IllegalArgumentException(message));
}
@@ -51,7 +62,7 @@ public final class YarnBootstrapREEFLauncher {
try {
final YarnBootstrapDriverConfigGenerator yarnDriverConfigurationGenerator =
Tang.Factory.getTang().newInjector().getInstance(YarnBootstrapDriverConfigGenerator.class);
- REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0])});
+ REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0], args[1])});
} catch (final Exception exception) {
if (!(exception instanceof RuntimeException)) {
throw fatal("Failed to initialize configurations.", exception);
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
index 6783aa8..6fa3f83 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -22,9 +22,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.Validate;
-import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.*;
import java.io.File;
import java.io.FileInputStream;
@@ -56,22 +54,29 @@ final class YarnClusterSubmissionFromCS {
private final String tokenKind;
private final String tokenService;
private final String jobSubmissionDirectoryPrefix;
+
+ private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters;
private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
- private YarnClusterSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
+ private YarnClusterSubmissionFromCS(final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters,
+ final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
yarnJobSubmissionParameters = yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
+ yarnAppSubmissionParameters = yarnClusterAppSubmissionParameters.getYarnAppSubmissionParameters();
final AvroJobSubmissionParameters jobSubmissionParameters =
yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
+ final AvroAppSubmissionParameters appSubmissionParameters =
+ yarnAppSubmissionParameters.getSharedAppSubmissionParameters();
+
this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
this.jobId = jobSubmissionParameters.getJobId().toString();
- this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
- this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
- this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
- this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
- this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
- this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
+ this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort();
+ this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount();
+ this.tcpTryCount = appSubmissionParameters.getTcpTryCount();
+ this.maxApplicationSubmissions = yarnClusterAppSubmissionParameters.getMaxApplicationSubmissions();
+ this.driverRecoveryTimeout = yarnAppSubmissionParameters.getDriverRecoveryTimeout();
+ this.driverMemory = yarnAppSubmissionParameters.getDriverMemory();
this.priority = DEFAULT_PRIORITY;
this.queue = DEFAULT_QUEUE;
this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
@@ -173,6 +178,13 @@ final class YarnClusterSubmissionFromCS {
}
/**
+ * @return The submission parameters for YARN applications.
+ */
+ AvroYarnAppSubmissionParameters getYarnAppSubmissionParameters() {
+ return yarnAppSubmissionParameters;
+ }
+
+ /**
* @return The submission parameters for YARN jobs.
*/
AvroYarnJobSubmissionParameters getYarnJobSubmissionParameters() {
@@ -189,21 +201,31 @@ final class YarnClusterSubmissionFromCS {
/**
* Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
*/
- static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
+ static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterAppSubmissionParametersFile,
+ final File yarnClusterJobSubmissionParametersFile)
throws IOException {
- try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
- // this is mainly a test hook
- return readYarnClusterSubmissionFromCSFromInputStream(fileInputStream);
+ try (final FileInputStream appFileInputStream = new FileInputStream(yarnClusterAppSubmissionParametersFile)) {
+ try (final FileInputStream jobFileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
+ // this is mainly a test hook
+ return readYarnClusterSubmissionFromCSFromInputStream(appFileInputStream, jobFileInputStream);
+ }
}
}
static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream(
- final InputStream inputStream) throws IOException {
- final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
- AvroYarnClusterJobSubmissionParameters.getClassSchema(), inputStream);
- final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
+ final InputStream appInputStream, final InputStream jobInputStream) throws IOException {
+ final JsonDecoder appDecoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnClusterAppSubmissionParameters.getClassSchema(), appInputStream);
+ final SpecificDatumReader<AvroYarnClusterAppSubmissionParameters> appReader = new SpecificDatumReader<>(
+ AvroYarnClusterAppSubmissionParameters.class);
+ final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder);
+
+ final JsonDecoder jobDecoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnClusterJobSubmissionParameters.getClassSchema(), jobInputStream);
+ final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> jobReader = new SpecificDatumReader<>(
AvroYarnClusterJobSubmissionParameters.class);
- final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
- return new YarnClusterSubmissionFromCS(yarnClusterJobSubmissionParameters);
+ final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = jobReader.read(null, jobDecoder);
+
+ return new YarnClusterSubmissionFromCS(yarnClusterAppSubmissionParameters, yarnClusterJobSubmissionParameters);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 9250462..d43ee8d 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.runtime.common.files.ClasspathProvider;
@@ -68,7 +69,7 @@ public final class YarnJobSubmissionClient {
private final ClasspathProvider classpath;
private final SecurityTokenProvider tokenProvider;
private final List<String> commandPrefixList;
- private final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
+ private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
@Inject
YarnJobSubmissionClient(final JobUploader uploader,
@@ -78,7 +79,7 @@ public final class YarnJobSubmissionClient {
@Parameter(DriverLaunchCommandPrefix.class)
final List<String> commandPrefixList,
final SecurityTokenProvider tokenProvider,
- final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
+ final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
this.uploader = uploader;
this.fileNames = fileNames;
this.yarnConfiguration = yarnConfiguration;
@@ -121,13 +122,24 @@ public final class YarnJobSubmissionClient {
// ------------------------------------------------------------------------
// Upload the JAR
LOG.info("Uploading job submission JAR");
- final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile);
+ final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE);
LOG.info("Uploaded job submission JAR");
// ------------------------------------------------------------------------
+ // Upload the job file
+ final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(
+ new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()),
+ LocalResourceType.FILE);
+
+ final List<String> confFiles = new ArrayList<>();
+ confFiles.add(fileNames.getYarnBootstrapJobParamFilePath());
+ confFiles.add(fileNames.getYarnBootstrapAppParamFilePath());
+
+ // ------------------------------------------------------------------------
// Submit
submissionHelper
.addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+ .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS)
.setApplicationName(yarnSubmission.getJobId())
.setDriverMemory(yarnSubmission.getDriverMemory())
.setPriority(yarnSubmission.getPriority())
@@ -135,7 +147,7 @@ public final class YarnJobSubmissionClient {
.setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
.setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
.setLauncherClass(YarnBootstrapREEFLauncher.class)
- .setConfigurationFileName(fileNames.getYarnBootstrapParamFilePath())
+ .setConfigurationFilePaths(confFiles)
.submit();
writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
@@ -221,16 +233,23 @@ public final class YarnJobSubmissionClient {
/**
* .NET client calls into this main method for job submission.
* For arguments detail:
- * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File)
+ * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File)
*/
public static void main(final String[] args) throws InjectionException, IOException, YarnException {
final File jobSubmissionParametersFile = new File(args[0]);
+ final File appSubmissionParametersFile = new File(args[1]);
+
+ if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath());
+ }
+
if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
}
final YarnClusterSubmissionFromCS yarnSubmission =
- YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+ YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(
+ appSubmissionParametersFile, jobSubmissionParametersFile);
LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
deleted file mode 100644
index ccb5e8b..0000000
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
-
-import javax.inject.Inject;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.logging.Logger;
-
-/**
- * Does client side manipulation of driver configuration for YARN runtime.
- */
-final class YarnJobSubmissionParametersFileGenerator {
- private static final Logger LOG = Logger.getLogger(YarnJobSubmissionParametersFileGenerator.class.getName());
- private final REEFFileNames fileNames;
-
- @Inject
- private YarnJobSubmissionParametersFileGenerator(final REEFFileNames fileNames) {
- this.fileNames = fileNames;
- }
-
- /**
- * Writes driver configuration to disk.
- * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the
- * YARN job for submission from the cluster.
- * @throws IOException
- */
- public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
- final JobFolder jobFolderOnDFS) throws IOException {
- final File yarnParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(),
- fileNames.getYarnBootstrapParamFilePath());
-
- try (final FileOutputStream fileOutputStream = new FileOutputStream(yarnParametersFile)) {
- // this is mainly a test hook.
- writeAvroYarnJobSubmissionParametersToOutputStream(
- yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), fileOutputStream);
- }
- }
-
- static void writeAvroYarnJobSubmissionParametersToOutputStream(
- final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
- final String jobFolderOnDFSPath,
- final OutputStream outputStream) throws IOException {
- final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
- new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
-
- final AvroYarnJobSubmissionParameters jobSubmissionParameters =
- yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
- jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
- final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
- outputStream);
- datumWriter.write(jobSubmissionParameters, encoder);
- encoder.flush();
- outputStream.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
new file mode 100644
index 0000000..506ced6
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnSubmissionParametersFileGenerator {
+ private static final Logger LOG = Logger.getLogger(YarnSubmissionParametersFileGenerator.class.getName());
+ private final REEFFileNames fileNames;
+
+ @Inject
+ private YarnSubmissionParametersFileGenerator(final REEFFileNames fileNames) {
+ this.fileNames = fileNames;
+ }
+
+ /**
+ * Writes driver configuration to disk.
+ * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the
+ * YARN job for submission from the cluster.
+ * @throws IOException
+ */
+ public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+ final JobFolder jobFolderOnDFS) throws IOException {
+ final File yarnAppParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(),
+ fileNames.getYarnBootstrapAppParamFilePath());
+
+ final File yarnJobParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(),
+ fileNames.getYarnBootstrapJobParamFilePath());
+
+ try (final FileOutputStream appFileOutputStream = new FileOutputStream(yarnAppParametersFile)) {
+ try (final FileOutputStream jobFileOutputStream = new FileOutputStream(yarnJobParametersFile)) {
+ // this is mainly a test hook.
+ writeAvroYarnAppSubmissionParametersToOutputStream(yarnClusterSubmissionFromCS, appFileOutputStream);
+ writeAvroYarnJobSubmissionParametersToOutputStream(
+ yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), jobFileOutputStream);
+ }
+ }
+ }
+
+ static void writeAvroYarnAppSubmissionParametersToOutputStream(
+ final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+ final OutputStream outputStream) throws IOException {
+ final DatumWriter<AvroYarnAppSubmissionParameters> datumWriter =
+ new SpecificDatumWriter<>(AvroYarnAppSubmissionParameters.class);
+
+ final AvroYarnAppSubmissionParameters appSubmissionParameters =
+ yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters();
+ final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(appSubmissionParameters.getSchema(), outputStream);
+ datumWriter.write(appSubmissionParameters, encoder);
+ encoder.flush();
+ outputStream.flush();
+ }
+
+ static void writeAvroYarnJobSubmissionParametersToOutputStream(
+ final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+ final String jobFolderOnDFSPath,
+ final OutputStream outputStream) throws IOException {
+ final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
+ new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
+
+ final AvroYarnJobSubmissionParameters jobSubmissionParameters =
+ yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
+ jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
+ final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
+ outputStream);
+ datumWriter.write(jobSubmissionParameters, encoder);
+ encoder.flush();
+ outputStream.flush();
+ }
+}
[3/3] reef git commit: [REEF-1180] Split REEFClient submission
parameter file into per-application and per-job
Posted by af...@apache.org.
[REEF-1180] Split REEFClient submission parameter file into per-application and per-job
This addressed the issue by
* Add and modify avro files to reflect job and application specific parameters.
* Modify command line arguments for C# submission.
* Modify Java submission client and handlers to reflect change in submission arguments.
JIRA:
[REEF-1180](https://issues.apache.org/jira/browse/REEF-1180)
Pull Request:
Closes #823
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1137cdee
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1137cdee
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1137cdee
Branch: refs/heads/master
Commit: 1137cdee0e96e85851fc5fa8a2fdac3a87c58a4a
Parents: 20369d4
Author: Andrew Chung <af...@gmail.com>
Authored: Wed Feb 3 16:19:38 2016 -0800
Committer: Andrew Chung <af...@gmail.com>
Committed: Wed Feb 10 10:47:53 2016 -0800
----------------------------------------------------------------------
.../JobResourceUploaderTests.cs | 42 +++-
.../LegacyJobResourceUploaderTests.cs | 42 +++-
.../Org.Apache.REEF.Client.Tests.csproj | 9 +
.../WindowsYarnJobCommandProviderTests.cs | 20 +-
.../YarnREEFParamSerializerTests.cs | 196 +++++++++++++++++++
.../packages.config | 1 +
.../Avro/AvroAppSubmissionParameters.cs | 82 ++++++++
.../Avro/AvroJobSubmissionParameters.cs | 28 +--
.../Local/AvroLocalAppSubmissionParameters.cs | 74 +++++++
.../Local/AvroLocalJobSubmissionParameters.cs | 74 -------
.../YARN/AvroClusterAppSubmissionParameters.cs | 73 +++++++
.../YARN/AvroYarnAppSubmissionParameters.cs | 82 ++++++++
.../AvroYarnClusterJobSubmissionParameters.cs | 12 +-
.../YARN/AvroYarnJobSubmissionParameters.cs | 20 +-
.../Org.Apache.REEF.Client/Local/LocalClient.cs | 42 ++--
.../Org.Apache.REEF.Client.csproj | 7 +-
.../YARN/IJobResourceUploader.cs | 14 +-
.../Org.Apache.REEF.Client/YARN/JobResource.cs | 10 +-
.../YARN/LegacyJobResourceUploader.cs | 61 ++++--
.../YARN/RESTClient/DataModel/LocalResources.cs | 2 +-
.../RESTClient/FileSystemJobResourceUploader.cs | 55 +++++-
.../YARN/WindowsYarnJobCommandProvider.cs | 3 +-
.../YARN/YARNREEFClient.cs | 55 +-----
.../YARN/YarnREEFDotNetClient.cs | 100 ++++------
.../YARN/YarnREEFDotNetParamSerializer.cs | 114 +++++++++++
.../YARN/YarnREEFParamSerializer.cs | 137 +++++++++++++
.../Files/REEFFileNames.cs | 12 +-
.../src/main/avro/AppSubmissionParameters.avsc | 62 ++++++
.../src/main/avro/JobSubmissionParameters.avsc | 18 +-
.../reef/bridge/client/JobResourceUploader.java | 19 +-
.../apache/reef/bridge/client/LocalClient.java | 9 +-
...ocalRuntimeDriverConfigurationGenerator.java | 11 +-
.../bridge/client/LocalSubmissionFromCS.java | 50 +++--
.../YarnBootstrapDriverConfigGenerator.java | 64 ++++--
.../client/YarnBootstrapREEFLauncher.java | 19 +-
.../client/YarnClusterSubmissionFromCS.java | 62 ++++--
.../bridge/client/YarnJobSubmissionClient.java | 31 ++-
...arnJobSubmissionParametersFileGenerator.java | 82 --------
.../YarnSubmissionParametersFileGenerator.java | 103 ++++++++++
...SubmissionParametersSerializationFromCS.java | 97 ++++++---
.../reef/driver/evaluator/CLRProcess.java | 3 +-
.../reef/driver/evaluator/JVMProcess.java | 3 +-
.../runtime/common/files/REEFFileNames.java | 31 ++-
.../common/launch/CLRLaunchCommandBuilder.java | 14 +-
.../common/launch/JavaLaunchCommandBuilder.java | 13 +-
.../common/launch/LaunchCommandBuilder.java | 4 +-
.../launch/JavaLaunchCommandBuilderTest.java | 4 +-
.../client/HDInsightJobSubmissionHandler.java | 3 +-
.../client/PreparedDriverFolderLauncher.java | 3 +-
.../mesos/client/MesosJobSubmissionHandler.java | 3 +-
.../yarn/client/YarnJobSubmissionHandler.java | 4 +-
.../yarn/client/YarnSubmissionHelper.java | 31 ++-
.../runtime/yarn/client/uploader/JobFolder.java | 8 +-
53 files changed, 1573 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
index e5a6113..5f75d59 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
@@ -16,6 +16,8 @@
// under the License.
using System;
+using System.Collections.Generic;
+using System.Linq;
using NSubstitute;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn;
@@ -32,10 +34,13 @@ namespace Org.Apache.REEF.Client.Tests
private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path\";
private const string AnyDriverResourceUploadPath = "/vol1/tmp/";
private const string AnyUploadedResourcePath = "/vol1/tmp/Path.zip";
+ private const string AnyJobFileResourcePath = "/vol1/tmp/job-submission-params.json";
private const string AnyHost = "host";
private const string AnyScheme = "hdfs://";
private const string AnyUploadedResourceAbsoluteUri = AnyScheme + AnyHost + AnyUploadedResourcePath;
+ private const string AnyJobFileResourceAbsoluteUri = AnyScheme + AnyHost + AnyJobFileResourcePath;
private const string AnyLocalArchivePath = @"Any\Local\Archive\Path.zip";
+ private const string AnyLocalJobFilePath = @"Any\Local\Folder\Path\job-submission-params.json";
private const long AnyModificationTime = 1447413621;
private const long AnyResourceSize = 53092;
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
@@ -52,7 +57,7 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
- jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
// Archive file generator recieved exactly one call with correct driver local folder path
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath);
@@ -64,11 +69,19 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
- var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ var archiveJobResource = jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ var fileJobResource = jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
+ var jobResources = new List<JobResource> { archiveJobResource, fileJobResource };
- Assert.Equal(AnyModificationTime, jobResource.LastModificationUnixTimestamp);
- Assert.Equal(AnyResourceSize, jobResource.ResourceSize);
- Assert.Equal(AnyUploadedResourceAbsoluteUri, jobResource.RemoteUploadPath);
+ foreach (var resource in jobResources)
+ {
+ Assert.Equal(AnyModificationTime, resource.LastModificationUnixTimestamp);
+ Assert.Equal(AnyResourceSize, resource.ResourceSize);
+ }
+
+ var resourcePaths = new HashSet<string>(jobResources.Select(resource => resource.RemoteUploadPath));
+ Assert.True(resourcePaths.Contains(AnyUploadedResourceAbsoluteUri));
+ Assert.True(resourcePaths.Contains(AnyJobFileResourceAbsoluteUri));
}
[Fact]
@@ -77,14 +90,20 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
- jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
- testContext.FileSystem.Received(1).CreateUriForPath(AnyDriverResourceUploadPath);
testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath);
+
+ testContext.FileSystem.Received(2).CreateUriForPath(AnyDriverResourceUploadPath);
testContext.FileSystem.Received(1)
.CopyFromLocal(AnyLocalArchivePath, new Uri(AnyUploadedResourceAbsoluteUri));
- testContext.FileSystem.Received(1)
+ testContext.FileSystem.Received(2)
.CreateDirectory(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
+
+ testContext.FileSystem.Received(1).CreateUriForPath(AnyJobFileResourcePath);
+ testContext.FileSystem.Received(1)
+ .CopyFromLocal(AnyLocalJobFilePath, new Uri(AnyJobFileResourceAbsoluteUri));
}
private class TestContext
@@ -98,12 +117,19 @@ namespace Org.Apache.REEF.Client.Tests
var injector = TangFactory.GetTang().NewInjector();
FileSystem.GetFileStatus(new Uri(AnyUploadedResourceAbsoluteUri))
.Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize));
+ FileSystem.GetFileStatus(new Uri(AnyJobFileResourceAbsoluteUri))
+ .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize));
ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath)
.Returns(AnyLocalArchivePath);
FileSystem.CreateUriForPath(AnyDriverResourceUploadPath)
.Returns(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
FileSystem.CreateUriForPath(AnyUploadedResourcePath)
.Returns(new Uri(AnyUploadedResourceAbsoluteUri));
+ FileSystem.CreateUriForPath(AnyJobFileResourcePath)
+ .Returns(new Uri(AnyJobFileResourceAbsoluteUri));
+ IFile file = Substitute.For<IFile>();
+ file.Exists(Arg.Any<string>()).Returns(true);
+ injector.BindVolatileInstance(GenericType<IFile>.Class, file);
injector.BindVolatileInstance(GenericType<IResourceArchiveFileGenerator>.Class, ResourceArchiveFileGenerator);
injector.BindVolatileInstance(GenericType<IFileSystem>.Class, FileSystem);
return injector.GetInstance<FileSystemJobResourceUploader>();
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
index 2f66638..7cf9c1f 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.IO;
using NSubstitute;
using Org.Apache.REEF.Client.Common;
@@ -29,8 +30,9 @@ namespace Org.Apache.REEF.Client.Tests
public class LegacyJobResourceUploaderTests
{
private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path";
+ private const string AnyLocalJobFilePath = AnyDriverLocalFolderPath + @"\job-submission-params.json";
private const string AnyDriverResourceUploadPath = "/vol1/tmp";
- private const string AnyUploadedResourcePath = "hdfs://foo/vol1/tmp/driver.zip";
+ private const string AnyUploadedResourcePath = "hdfs://foo/vol1/tmp/anyFile";
private const long AnyModificationTime = 1446161745550;
private const long AnyResourceSize = 53092;
@@ -40,7 +42,7 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
- jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
// Archive file generator recieved exactly one call with correct driver local folder path with trailing \
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\");
@@ -52,22 +54,36 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip";
+ var anyLocalJobFilePath = AnyDriverLocalFolderPath.TrimEnd('\\') + @"\job-submission-params.json";
testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\")
.Returns(anyLocalArchivePath);
- jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader";
Guid notUsed;
// Clientlauncher is called with correct class name, local archive path, upload path and temp file.
- testContext.JavaClientLauncher.Received()
+ testContext.JavaClientLauncher.Received(1)
.Launch(javaClassNameForResourceUploader,
anyLocalArchivePath,
+ "ARCHIVE",
AnyDriverResourceUploadPath + "/",
Arg.Is<string>(
outputFilePath =>
Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath()
&& Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed)));
+
+ // Clientlauncher is called with correct class name, local job file path, upload path and temp file.
+ testContext.JavaClientLauncher.Received(1)
+ .Launch(javaClassNameForResourceUploader,
+ anyLocalJobFilePath,
+ "FILE",
+ AnyDriverResourceUploadPath + "/",
+ Arg.Is<string>(
+ outputFilePath =>
+ Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath()
+ && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed)));
}
[Fact]
@@ -77,7 +93,7 @@ namespace Org.Apache.REEF.Client.Tests
var jobResourceUploader = testContext.GetJobResourceUploader(fileExistsReturnValue: false);
// throws filenotfound exception
- Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath));
+ Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath));
}
[Fact]
@@ -86,11 +102,19 @@ namespace Org.Apache.REEF.Client.Tests
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
- var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
+ var jobResources = new List<JobResource>()
+ {
+ jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath),
+ jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath)
+ };
- Assert.Equal(AnyModificationTime, jobResource.LastModificationUnixTimestamp);
- Assert.Equal(AnyResourceSize, jobResource.ResourceSize);
- Assert.Equal(AnyUploadedResourcePath, jobResource.RemoteUploadPath);
+ Assert.Equal(jobResources.Count, 2);
+ foreach (var resource in jobResources)
+ {
+ Assert.Equal(AnyModificationTime, resource.LastModificationUnixTimestamp);
+ Assert.Equal(AnyResourceSize, resource.ResourceSize);
+ Assert.Equal(AnyUploadedResourcePath, resource.RemoteUploadPath);
+ }
}
private class TestContext
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
index bc47547..c53a380 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
@@ -36,6 +36,10 @@ under the License.
<BuildPackage>false</BuildPackage>
</PropertyGroup>
<ItemGroup>
+ <Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
+ <HintPath>..\packages\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath>
+ <Private>True</Private>
+ </Reference>
<Reference Include="NSubstitute, Version=1.8.2.0, Culture=neutral, PublicKeyToken=92dd2e9066daa5ca, processorArchitecture=MSIL">
<HintPath>$(PackagesDir)\NSubstitute.1.8.2.0\lib\net45\NSubstitute.dll</HintPath>
<Private>True</Private>
@@ -71,6 +75,7 @@ under the License.
<Compile Include="YarnClientTests.cs" />
<Compile Include="YarnConfigurationUrlProviderTests.cs" />
<Compile Include="WindowsYarnJobCommandProviderTests.cs" />
+ <Compile Include="YarnREEFParamSerializerTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
@@ -89,6 +94,10 @@ under the License.
<Project>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</Project>
<Name>Org.Apache.REEF.Utilities</Name>
</ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+ <Project>{A6BAA2A7-F52F-4329-884E-1BCF711D6805}</Project>
+ <Name>Org.Apache.REEF.Driver</Name>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs
index a8472ad..86c3c50 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs
@@ -73,8 +73,9 @@ namespace Org.Apache.REEF.Client.Tests
"arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" +
"/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" +
"/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" +
- "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" +
- "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
+ "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" +
+ ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" +
+ " 2> <LOG_DIR>/driver.stderr";
var commandBuilder = testContext.GetCommandBuilder();
var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand();
@@ -97,8 +98,9 @@ namespace Org.Apache.REEF.Client.Tests
"/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" +
"/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef -Djava." +
"util.logging.config.class=org.apache.reef.util.logging.Config org.apac" +
- "he.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submiss" +
- "ion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
+ "he.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" +
+ ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" +
+ " 2> <LOG_DIR>/driver.stderr";
var commandBuilder = testContext.GetCommandBuilder(true);
var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand();
Assert.Equal(expectedCommand, jobSubmissionCommand);
@@ -120,8 +122,9 @@ namespace Org.Apache.REEF.Client.Tests
"arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" +
"/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" +
"/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" +
- "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" +
- "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
+ "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" +
+ ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" +
+ " 2> <LOG_DIR>/driver.stderr";
string expectedCommand = string.Format(expectedCommandFormat, sizeMB);
var commandBuilder = testContext.GetCommandBuilder(maxMemAllocPoolSize: sizeMB);
var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand();
@@ -144,8 +147,9 @@ namespace Org.Apache.REEF.Client.Tests
"arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" +
"/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" +
"/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" +
- "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" +
- "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
+ "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" +
+ ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" +
+ " 2> <LOG_DIR>/driver.stderr";
string expectedCommand = string.Format(expectedCommandFormat, sizeMB);
var commandBuilder = testContext.GetCommandBuilder(maxPermSize: sizeMB);
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
new file mode 100644
index 0000000..4fd1264
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Text;
+using Newtonsoft.Json.Linq;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Client.Tests
+{
+ public sealed class YarnREEFParamSerializerTests
+ {
+ private const int AnyInt = 1000;
+ private const string AnyString = "Any";
+
+ [Fact]
+ public void TestYarnREEFDotNetAppSerialization()
+ {
+ const string formatStr = "{{" +
+ "\"sharedAppSubmissionParameters\":" +
+ "{{" +
+ "\"tcpBeginPort\":{0}," +
+ "\"tcpRangeCount\":{0}," +
+ "\"tcpTryCount\":{0}" +
+ "}}," +
+ "\"driverMemory\":{0}," +
+ "\"driverRecoveryTimeout\":{0}" +
+ "}}";
+
+ var expectedJson = string.Format(formatStr, AnyInt);
+
+ var tcpConf = TcpPortConfigurationModule.ConfigurationModule
+ .Set(TcpPortConfigurationModule.PortRangeCount, AnyInt.ToString())
+ .Set(TcpPortConfigurationModule.PortRangeStart, AnyInt.ToString())
+ .Set(TcpPortConfigurationModule.PortRangeTryCount, AnyInt.ToString())
+ .Build();
+
+ var driverConf = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverStartHandler>.Class)
+ .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, AnyInt.ToString())
+ .Build();
+
+ var injector = TangFactory.GetTang().NewInjector(tcpConf, driverConf);
+
+ var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>();
+ var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>()
+ .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build();
+
+ var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector, AnyString);
+ var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes));
+ var expectedJsonObject = JObject.Parse(expectedJson);
+ Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+ }
+
+ [Fact]
+ public void TestYarnREEFDotNetJobSerialization()
+ {
+ const string formatString =
+ "{{" +
+ "\"sharedJobSubmissionParameters\":" +
+ "{{" +
+ "\"jobId\":\"{0}\"," +
+ "\"jobSubmissionFolder\":\"{0}\"" +
+ "}}," +
+ "\"dfsJobSubmissionFolder\":\"{0}\"," +
+ "\"jobSubmissionDirectoryPrefix\":\"{0}\"" +
+ "}}";
+
+ var expectedJson = string.Format(formatString, AnyString);
+ var injector = TangFactory.GetTang().NewInjector();
+
+ var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>();
+ var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>()
+ .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build();
+
+ var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString, AnyString);
+ var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes));
+ var expectedJsonObject = JObject.Parse(expectedJson);
+ Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+ }
+
+ [Fact]
+ public void TestYarnREEFAppSerialization()
+ {
+ const string formatString = "{{" +
+ "\"yarnAppSubmissionParameters\":" +
+ "{{\"sharedAppSubmissionParameters\":" +
+ "{{\"tcpBeginPort\":{0}," +
+ "\"tcpRangeCount\":{0}," +
+ "\"tcpTryCount\":{0}" +
+ "}}," +
+ "\"driverMemory\":{0}," +
+ "\"driverRecoveryTimeout\":{0}" +
+ "}}," +
+ "\"maxApplicationSubmissions\":{0}" +
+ "}}";
+
+ var expectedJson = string.Format(formatString, AnyInt);
+ var tcpConf = TcpPortConfigurationModule.ConfigurationModule
+ .Set(TcpPortConfigurationModule.PortRangeCount, AnyInt.ToString())
+ .Set(TcpPortConfigurationModule.PortRangeStart, AnyInt.ToString())
+ .Set(TcpPortConfigurationModule.PortRangeTryCount, AnyInt.ToString())
+ .Build();
+
+ var driverConf = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverStartHandler>.Class)
+ .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, AnyInt.ToString())
+ .Set(DriverConfiguration.MaxApplicationSubmissions, AnyInt.ToString()).Build();
+
+ var injector = TangFactory.GetTang().NewInjector(tcpConf, driverConf);
+
+ var serializer = injector.GetInstance<YarnREEFParamSerializer>();
+ var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>()
+ .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build();
+
+ var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector);
+ var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes));
+ var expectedJsonObject = JObject.Parse(expectedJson);
+ Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+ }
+
+ [Fact]
+ public void TestYarnREEFJobSerialization()
+ {
+ const string formatString =
+ "{{" +
+ "\"yarnJobSubmissionParameters\":" +
+ "{{" +
+ "\"sharedJobSubmissionParameters\":" +
+ "{{" +
+ "\"jobId\":\"{0}\"," +
+ "\"jobSubmissionFolder\":\"{0}\"" +
+ "}},\"dfsJobSubmissionFolder\":\"NULL\"," +
+ "\"jobSubmissionDirectoryPrefix\":\"{0}\"" +
+ "}}," +
+ "\"securityTokenKind\":\"{0}\",\"securityTokenService\":\"{0}\"" +
+ "}}";
+
+ var conf = YARNClientConfiguration.ConfigurationModule
+ .Set(YARNClientConfiguration.SecurityTokenKind, AnyString)
+ .Set(YARNClientConfiguration.SecurityTokenService, AnyString)
+ .Set(YARNClientConfiguration.JobSubmissionFolderPrefix, AnyString)
+ .Build();
+
+ var expectedJson = string.Format(formatString, AnyString);
+ var injector = TangFactory.GetTang().NewInjector(conf);
+
+ var serializer = injector.GetInstance<YarnREEFParamSerializer>();
+ var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>()
+ .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build();
+
+ var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString);
+ var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes));
+ var expectedJsonObject = JObject.Parse(expectedJson);
+ Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+ }
+
+ private sealed class DriverStartHandler : IObserver<IDriverStarted>
+ {
+ public void OnNext(IDriverStarted value)
+ {
+ // Intentionally empty.
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/packages.config b/lang/cs/Org.Apache.REEF.Client.Tests/packages.config
index 4a71ef7..657c994 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/packages.config
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/packages.config
@@ -18,6 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<packages>
+ <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" />
<package id="NSubstitute" version="1.8.2.0" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
<package id="xunit" version="2.1.0" targetFramework="net45" />
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs
new file mode 100644
index 0000000..f43de0d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Client.Avro
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [Private]
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroAppSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the tcpBeginPort field.
+ /// </summary>
+ [DataMember]
+ public int tcpBeginPort { get; set; }
+
+ /// <summary>
+ /// Gets or sets the tcpRangeCount field.
+ /// </summary>
+ [DataMember]
+ public int tcpRangeCount { get; set; }
+
+ /// <summary>
+ /// Gets or sets the tcpTryCount field.
+ /// </summary>
+ [DataMember]
+ public int tcpTryCount { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroAppSubmissionParameters"/> class.
+ /// </summary>
+ public AvroAppSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroAppSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="tcpBeginPort">The tcpBeginPort.</param>
+ /// <param name="tcpRangeCount">The tcpRangeCount.</param>
+ /// <param name="tcpTryCount">The tcpTryCount.</param>
+ public AvroAppSubmissionParameters(int tcpBeginPort, int tcpRangeCount, int tcpTryCount)
+ {
+ this.tcpBeginPort = tcpBeginPort;
+ this.tcpRangeCount = tcpRangeCount;
+ this.tcpTryCount = tcpTryCount;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
index cb7fafd..88529dd 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -48,24 +48,6 @@ namespace Org.Apache.REEF.Client.Avro
public string jobId { get; set; }
/// <summary>
- /// Gets or sets the tcpBeginPort field.
- /// </summary>
- [DataMember]
- public int tcpBeginPort { get; set; }
-
- /// <summary>
- /// Gets or sets the tcpRangeCount field.
- /// </summary>
- [DataMember]
- public int tcpRangeCount { get; set; }
-
- /// <summary>
- /// Gets or sets the tcpTryCount field.
- /// </summary>
- [DataMember]
- public int tcpTryCount { get; set; }
-
- /// <summary>
/// Gets or sets the jobSubmissionFolder field.
/// </summary>
[DataMember]
@@ -82,16 +64,10 @@ namespace Org.Apache.REEF.Client.Avro
/// Initializes a new instance of the <see cref="AvroJobSubmissionParameters"/> class.
/// </summary>
/// <param name="jobId">The jobId.</param>
- /// <param name="tcpBeginPort">The tcpBeginPort.</param>
- /// <param name="tcpRangeCount">The tcpRangeCount.</param>
- /// <param name="tcpTryCount">The tcpTryCount.</param>
/// <param name="jobSubmissionFolder">The jobSubmissionFolder.</param>
- public AvroJobSubmissionParameters(string jobId, int tcpBeginPort, int tcpRangeCount, int tcpTryCount, string jobSubmissionFolder)
+ public AvroJobSubmissionParameters(string jobId, string jobSubmissionFolder)
{
this.jobId = jobId;
- this.tcpBeginPort = tcpBeginPort;
- this.tcpRangeCount = tcpRangeCount;
- this.tcpTryCount = tcpTryCount;
this.jobSubmissionFolder = jobSubmissionFolder;
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs
new file mode 100644
index 0000000..d4923f4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Client.Avro.Local
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [Private]
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroLocalAppSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters"",""doc"":""Cross-language application submission parameters to the Local runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""maxNumberOfConcurrentEvaluators"",""type"":""int""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the sharedAppSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroAppSubmissionParameters sharedAppSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maxNumberOfConcurrentEvaluators field.
+ /// </summary>
+ [DataMember]
+ public int maxNumberOfConcurrentEvaluators { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroLocalAppSubmissionParameters"/> class.
+ /// </summary>
+ public AvroLocalAppSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroLocalAppSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="sharedAppSubmissionParameters">The sharedAppSubmissionParameters.</param>
+ /// <param name="maxNumberOfConcurrentEvaluators">The maxNumberOfConcurrentEvaluators.</param>
+ public AvroLocalAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int maxNumberOfConcurrentEvaluators)
+ {
+ this.sharedAppSubmissionParameters = sharedAppSubmissionParameters;
+ this.maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
deleted file mode 100644
index bf90109..0000000
--- a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Runtime.Serialization;
-using Org.Apache.REEF.Utilities.Attributes;
-
-namespace Org.Apache.REEF.Client.Avro.Local
-{
- /// <summary>
- /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters.
- /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
- /// </summary>
- [Private]
- [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
- public sealed class AvroLocalJobSubmissionParameters
- {
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the Local runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""maxNumberOfConcurrentEvaluators"",""type"":""int""}]}";
-
- /// <summary>
- /// Gets the schema.
- /// </summary>
- public static string Schema
- {
- get
- {
- return JsonSchema;
- }
- }
-
- /// <summary>
- /// Gets or sets the sharedJobSubmissionParameters field.
- /// </summary>
- [DataMember]
- public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; }
-
- /// <summary>
- /// Gets or sets the maxNumberOfConcurrentEvaluators field.
- /// </summary>
- [DataMember]
- public int maxNumberOfConcurrentEvaluators { get; set; }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
- /// </summary>
- public AvroLocalJobSubmissionParameters()
- {
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
- /// </summary>
- /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
- /// <param name="maxNumberOfConcurrentEvaluators">The maxNumberOfConcurrentEvaluators.</param>
- public AvroLocalJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int maxNumberOfConcurrentEvaluators)
- {
- this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
- this.maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs
new file mode 100644
index 0000000..a3d0866
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Client.Avro.YARN
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters.
+ /// </summary>
+ [Private]
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroYarnClusterAppSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters"",""doc"":""Cross-language application submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""dr
iverRecoveryTimeout"",""type"":""int""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the yarnAppSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroYarnAppSubmissionParameters yarnAppSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maxApplicationSubmissions field.
+ /// </summary>
+ [DataMember]
+ public int maxApplicationSubmissions { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class.
+ /// </summary>
+ public AvroYarnClusterAppSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="yarnAppSubmissionParameters">The yarnAppSubmissionParameters.</param>
+ /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
+ public AvroYarnClusterAppSubmissionParameters(AvroYarnAppSubmissionParameters yarnAppSubmissionParameters, int maxApplicationSubmissions)
+ {
+ this.yarnAppSubmissionParameters = yarnAppSubmissionParameters;
+ this.maxApplicationSubmissions = maxApplicationSubmissions;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs
new file mode 100644
index 0000000..0612800
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Client.Avro.YARN
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [Private]
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroYarnAppSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the sharedAppSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroAppSubmissionParameters sharedAppSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the driverMemory field.
+ /// </summary>
+ [DataMember]
+ public int driverMemory { get; set; }
+
+ /// <summary>
+ /// Gets or sets the driverRecoveryTimeout field.
+ /// </summary>
+ [DataMember]
+ public int driverRecoveryTimeout { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnAppSubmissionParameters"/> class.
+ /// </summary>
+ public AvroYarnAppSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnAppSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="sharedAppSubmissionParameters">The sharedAppSubmissionParameters.</param>
+ /// <param name="driverMemory">The driverMemory.</param>
+ /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param>
+ public AvroYarnAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int driverMemory, int driverRecoveryTimeout)
+ {
+ this.sharedAppSubmissionParameters = sharedAppSubmissionParameters;
+ this.driverMemory = driverMemory;
+ this.driverRecoveryTimeout = driverRecoveryTimeout;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
index d245461..159c8cf 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnClusterJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na
me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -48,12 +48,6 @@ namespace Org.Apache.REEF.Client.Avro.YARN
public AvroYarnJobSubmissionParameters yarnJobSubmissionParameters { get; set; }
/// <summary>
- /// Gets or sets the maxApplicationSubmissions field.
- /// </summary>
- [DataMember]
- public int maxApplicationSubmissions { get; set; }
-
- /// <summary>
/// Gets or sets the securityTokenKind field.
/// </summary>
[DataMember]
@@ -78,13 +72,11 @@ namespace Org.Apache.REEF.Client.Avro.YARN
/// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class.
/// </summary>
/// <param name="yarnJobSubmissionParameters">The yarnJobSubmissionParameters.</param>
- /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
/// <param name="securityTokenKind">The securityTokenKind.</param>
/// <param name="securityTokenService">The securityTokenService.</param>
- public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, int maxApplicationSubmissions, string securityTokenKind, string securityTokenService)
+ public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService)
{
this.yarnJobSubmissionParameters = yarnJobSubmissionParameters;
- this.maxApplicationSubmissions = maxApplicationSubmissions;
this.securityTokenKind = securityTokenKind;
this.securityTokenService = securityTokenService;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
index 65f96c3..9f03dac 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -48,18 +48,6 @@ namespace Org.Apache.REEF.Client.Avro.YARN
public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; }
/// <summary>
- /// Gets or sets the driverMemory field.
- /// </summary>
- [DataMember]
- public int driverMemory { get; set; }
-
- /// <summary>
- /// Gets or sets the driverRecoveryTimeout field.
- /// </summary>
- [DataMember]
- public int driverRecoveryTimeout { get; set; }
-
- /// <summary>
/// Gets or sets the dfsJobSubmissionFolder field.
/// </summary>
[DataMember]
@@ -83,15 +71,11 @@ namespace Org.Apache.REEF.Client.Avro.YARN
/// Initializes a new instance of the <see cref="AvroYarnJobSubmissionParameters"/> class.
/// </summary>
/// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
- /// <param name="driverMemory">The driverMemory.</param>
- /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param>
/// <param name="dfsJobSubmissionFolder">The dfsJobSubmissionFolder.</param>
/// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param>
- public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix)
+ public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix)
{
this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
- this.driverMemory = driverMemory;
- this.driverRecoveryTimeout = driverRecoveryTimeout;
this.dfsJobSubmissionFolder = dfsJobSubmissionFolder;
this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
index 0fb9e22..acc2a69 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
@@ -90,29 +90,45 @@ namespace Org.Apache.REEF.Client.Local
// Intentionally left blank.
}
- private string CreateBootstrapAvroConfig(IJobSubmission jobSubmission, string driverFolder)
+ private string CreateBootstrapAvroJobConfig(IJobSubmission jobSubmission, string driverFolder)
{
- var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
-
- var bootstrapArgs = new AvroJobSubmissionParameters
+ var bootstrapJobArgs = new AvroJobSubmissionParameters
{
jobSubmissionFolder = driverFolder,
jobId = jobSubmission.JobIdentifier,
+ };
+
+ var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile());
+ using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
+ {
+ var serializedArgs = AvroJsonSerializer<AvroJobSubmissionParameters>.ToBytes(bootstrapJobArgs);
+ argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+
+ return submissionArgsFilePath;
+ }
+
+ private string CreateBootstrapAvroAppConfig(IJobSubmission jobSubmission, string driverFolder)
+ {
+ var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
+
+ var bootstrapAppArgs = new AvroAppSubmissionParameters
+ {
tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
};
- var avroLocalBootstrapArgs = new AvroLocalJobSubmissionParameters
+ var avroLocalBootstrapAppArgs = new AvroLocalAppSubmissionParameters
{
- sharedJobSubmissionParameters = bootstrapArgs,
+ sharedAppSubmissionParameters = bootstrapAppArgs,
maxNumberOfConcurrentEvaluators = _maxNumberOfConcurrentEvaluators
};
- var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile());
+ var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetAppSubmissionParametersFile());
using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
{
- var serializedArgs = AvroJsonSerializer<AvroLocalJobSubmissionParameters>.ToBytes(avroLocalBootstrapArgs);
+ var serializedArgs = AvroJsonSerializer<AvroLocalAppSubmissionParameters>.ToBytes(avroLocalBootstrapAppArgs);
argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
}
@@ -134,17 +150,19 @@ namespace Org.Apache.REEF.Client.Local
public void Submit(IJobSubmission jobSubmission)
{
var driverFolder = PrepareDriverFolder(jobSubmission);
- var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder);
- _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath);
+ var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobSubmission, driverFolder);
+ var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobSubmission, driverFolder);
+ _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath);
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}
public IJobSubmissionResult SubmitAndGetJobStatus(IJobSubmission jobSubmission)
{
var driverFolder = PrepareDriverFolder(jobSubmission);
- var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder);
+ var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobSubmission, driverFolder);
+ var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobSubmission, driverFolder);
- Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath));
+ Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath));
var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint);
JobSubmissionResult result = new LocalJobSubmissionResult(this, fileName);
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index fed7dc3..60316e4 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -69,8 +69,11 @@ under the License.
<Compile Include="API\JobSubmissionBuilder.cs" />
<Compile Include="API\JobSubmissionBuilderFactory.cs" />
<Compile Include="API\TcpPortConfigurationModule.cs" />
+ <Compile Include="Avro\AvroAppSubmissionParameters.cs" />
<Compile Include="Avro\AvroJobSubmissionParameters.cs" />
- <Compile Include="Avro\Local\AvroLocalJobSubmissionParameters.cs" />
+ <Compile Include="Avro\Local\AvroLocalAppSubmissionParameters.cs" />
+ <Compile Include="Avro\YARN\AvroClusterAppSubmissionParameters.cs" />
+ <Compile Include="Avro\YARN\AvroYarnAppSubmissionParameters.cs" />
<Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" />
<Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" />
<Compile Include="Common\DotNetFile.cs" />
@@ -158,6 +161,8 @@ under the License.
<Compile Include="YARN\YARNClientConfiguration.cs" />
<Compile Include="YARN\YarnCommandLineEnvironment.cs" />
<Compile Include="YARN\YarnREEFDotNetClient.cs" />
+ <Compile Include="YARN\YarnREEFDotNetParamSerializer.cs" />
+ <Compile Include="YARN\YarnREEFParamSerializer.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Avro\README.md" />
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
index b4b1640..8123f2c 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
@@ -23,11 +23,19 @@ namespace Org.Apache.REEF.Client.Yarn
public interface IJobResourceUploader
{
/// <summary>
- /// Upload archived local driver folder to DFS destination path.
+ /// Creates the archive from local driver folder and uploads it to DFS destination path.
/// </summary>
/// <param name="driverLocalFolderPath">Local folder where REEF application resources are staged</param>
- /// <param name="jobSubmissionDirectory">Remote directory path where we will upload resources</param>
+ /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param>
/// <returns>Path, modification time and size of uploaded file as JobResource</returns>
- JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory);
+ JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath);
+
+ /// <summary>
+ /// Locates a file resource and uploads it to DFS destination path.
+ /// </summary>
+ /// <param name="fileLocalPath">file path</param>
+ /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param>
+ /// <returns>Path, modification time and size of uploaded file as JobResource</returns>
+ JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs b/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs
index ae1a634..7b78dee 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs
@@ -15,14 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Utilities.Attributes;
+
namespace Org.Apache.REEF.Client.Yarn
{
- public class JobResource
+ [Unstable("New API.")]
+ public sealed class JobResource
{
+ public string Name { get; set; }
+
public string RemoteUploadPath { get; set; }
public long LastModificationUnixTimestamp { get; set; }
public long ResourceSize { get; set; }
+
+ public ResourceType ResourceType { get; set; }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
index 5fde77d..94fde87 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
@@ -16,8 +16,11 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.IO;
using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
@@ -42,38 +45,72 @@ namespace Org.Apache.REEF.Client.Yarn
private readonly IJavaClientLauncher _javaLauncher;
private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator;
private readonly IFile _file;
+ private readonly REEFFileNames _reefFileNames;
[Inject]
private LegacyJobResourceUploader(
IJavaClientLauncher javaLauncher,
IResourceArchiveFileGenerator resourceArchiveFileGenerator,
IFile file,
- IYarnCommandLineEnvironment yarn)
+ IYarnCommandLineEnvironment yarn,
+ REEFFileNames reefFileNames)
{
_file = file;
_resourceArchiveFileGenerator = resourceArchiveFileGenerator;
_javaLauncher = javaLauncher;
_javaLauncher.AddToClassPath(yarn.GetYarnClasspathList());
+ _reefFileNames = reefFileNames;
}
- public JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory)
+ public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath)
{
driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
- string driverUploadPath = jobSubmissionDirectory.TrimEnd('/') + @"/";
+ var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+ return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName());
+ }
- var resourceDetailsOutputPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N"));
- _javaLauncher.Launch(JavaClassNameForResourceUploader,
- archivePath,
- driverUploadPath,
- resourceDetailsOutputPath);
+ public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath)
+ {
+ var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
+ var jobArgsFilePath = fileLocalPath;
+ return GetJobResource(jobArgsFilePath, ResourceType.FILE, driverUploadPath);
+ }
- return ParseGeneratedOutputFile(resourceDetailsOutputPath);
+ private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null)
+ {
+ if (!_file.Exists(filePath))
+ {
+ Exceptions.Throw(
+ new FileNotFoundException("Could not find resource file " + filePath),
+ Log);
+ }
+
+ var detailsOutputPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N"));
+
+ try
+ {
+ _javaLauncher.Launch(JavaClassNameForResourceUploader,
+ filePath,
+ resourceType.ToString(),
+ driverUploadPath,
+ detailsOutputPath);
+
+ var localizedResourceName = localizedName ?? Path.GetFileName(filePath);
+ return ParseGeneratedOutputFile(detailsOutputPath, localizedResourceName, resourceType);
+ }
+ finally
+ {
+ if (_file.Exists(detailsOutputPath))
+ {
+ _file.Delete(detailsOutputPath);
+ }
+ }
}
- private JobResource ParseGeneratedOutputFile(string resourceDetailsOutputPath)
+ private JobResource ParseGeneratedOutputFile(string resourceDetailsOutputPath, string resourceName, ResourceType resourceType)
{
if (!_file.Exists(resourceDetailsOutputPath))
{
@@ -91,9 +128,11 @@ namespace Org.Apache.REEF.Client.Yarn
return new JobResource
{
+ Name = resourceName,
RemoteUploadPath = tokens[0],
LastModificationUnixTimestamp = long.Parse(tokens[1]),
- ResourceSize = long.Parse(tokens[2])
+ ResourceSize = long.Parse(tokens[2]),
+ ResourceType = resourceType
};
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs
index c27d330..999b6b4 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs
@@ -56,7 +56,7 @@ namespace Org.Apache.REEF.Client.YARN.RestClient.DataModel
public long Timestamp { get; set; }
}
- internal enum ResourceType
+ public enum ResourceType
{
ARCHIVE,