You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2016/05/09 21:54:23 UTC
reef git commit: [REEF-1354] Yarn bootstrap for c# multi-runtime
client This addresses the issue by: 1) Adding bootstrap mechanism for
multiruntime
Repository: reef
Updated Branches:
refs/heads/master 913c66cc7 -> cafda7df5
[REEF-1354] Yarn bootstrap for c# multi-runtime client
This addresses the issue by:
1) Adding bootstrap mechanism for multiruntime
JIRA:
[REEF-1354](https://issues.apache.org/jira/browse/REEF-1354)
Pull Request:
Closes #966
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/cafda7df
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/cafda7df
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/cafda7df
Branch: refs/heads/master
Commit: cafda7df5488a02f250379e2da7c445ecc324f82
Parents: 913c66c
Author: Boris Shulman <sh...@gmail.com>
Authored: Sat Apr 23 09:05:04 2016 -0700
Committer: Andrew Chung <af...@gmail.com>
Committed: Mon May 9 14:22:37 2016 -0700
----------------------------------------------------------------------
lang/java/reef-bridge-client/pom.xml | 5 +
.../src/main/avro/AppSubmissionParameters.avsc | 16 +-
...untimeAppSubmissionParametersSerializer.java | 68 +++++
...roYarnJobSubmissionParametersSerializer.java | 69 +++++
...ntimeYarnBootstrapDriverConfigGenerator.java | 287 +++++++++++++++++++
.../MultiRuntimeYarnBootstrapREEFLauncher.java | 83 ++++++
...SubmissionParametersSerializationFromCS.java | 252 +++++++++++++++-
7 files changed, 775 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index 1363942..b2bb759 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -80,6 +80,11 @@ under the License.
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-multi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Adding Hadoop to make sure we have it in the shaded jar -->
<dependency>
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
index d4926e9..7cc52d6 100644
--- a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
@@ -47,5 +47,19 @@
{ "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" },
{ "name": "driverRecoveryTimeout", "type": "int" }
]
- }
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroMultiRuntimeAppSubmissionParameters",
+ "doc": "General cross-language application submission parameters to the YARN runtime",
+ "fields": [
+ { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" },
+ { "name": "localRuntimeAppParameters", "type": ["null", "AvroLocalAppSubmissionParameters"], "default":
+ null },
+ { "name": "yarnRuntimeAppParameters", "type": ["null", "AvroYarnAppSubmissionParameters"], "default": null },
+ { "name": "defaultRuntimeName", "type":"string", "doc":"The name of the default runtime" },
+ { "name": "runtimes", "type": { "type" :"array", "items": "string"}, "doc":"defined runtimes" }
+ ]
+ }
]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java
new file mode 100644
index 0000000..af497cc
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.reef.bridge.client.avro.AvroMultiRuntimeAppSubmissionParameters;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Searializer class for the AvroMultiRuntimeAppSubmissionParameters.
+ */
+final class AvroMultiRuntimeAppSubmissionParametersSerializer {
+ @Inject
+ private AvroMultiRuntimeAppSubmissionParametersSerializer(){
+ }
+
+ /**
+ * Reads avro object from file.
+ *
+ * @param file The file to read from
+ * @return Avro object
+ * @throws IOException
+ */
+ AvroMultiRuntimeAppSubmissionParameters fromFile(final File file) throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+ // This is mainly a test hook.
+ return fromInputStream(fileInputStream);
+ }
+ }
+
+ /**
+ * Reads avro object from input stream.
+ *
+ * @param inputStream The input stream to read from
+ * @return Avro object
+ * @throws IOException
+ */
+ AvroMultiRuntimeAppSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroMultiRuntimeAppSubmissionParameters.getClassSchema(), inputStream);
+ final SpecificDatumReader<AvroMultiRuntimeAppSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroMultiRuntimeAppSubmissionParameters.class);
+ return reader.read(null, decoder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java
new file mode 100644
index 0000000..84f0a19
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Searializer class for the AvroYarnAppSubmissionParameters.
+ */
+final class AvroYarnJobSubmissionParametersSerializer {
+ @Inject
+ private AvroYarnJobSubmissionParametersSerializer(){
+ }
+
+ /**
+ * Reads avro object from file.
+ *
+ * @param file The file to read from
+ * @return Avro object
+ * @throws IOException
+ */
+ AvroYarnJobSubmissionParameters fromFile(final File file) throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+ // This is mainly a test hook.
+ return fromInputStream(fileInputStream);
+ }
+ }
+
+ /**
+ * Reads avro object from input stream.
+ *
+ * @param inputStream The input stream to read from
+ * @return Avro object
+ * @throws IOException
+ */
+ AvroYarnJobSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
+ final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroYarnJobSubmissionParameters.class);
+ return reader.read(null, decoder);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java
new file mode 100644
index 0000000..c2fc144
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.reef.bridge.client.avro.*;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
+import org.apache.reef.runtime.multi.client.*;
+import org.apache.reef.runtime.multi.driver.MultiRuntimeDriverConfiguration;
+import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.driver.*;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is the Java Driver configuration generator for .NET Drivers that generates
+ * the Driver configuration at runtime for multiruntime. Called by {@link MultiRuntimeYarnBootstrapREEFLauncher}.
+ */
+@DriverSide
+final class MultiRuntimeYarnBootstrapDriverConfigGenerator {
+ private static final Logger LOG = Logger.getLogger(MultiRuntimeYarnBootstrapDriverConfigGenerator.class.getName());
+ private static final String DUMMY_YARN_RUNTIME = "DummyYarnRuntime";
+
+ private final MultiRuntimeDefinitionSerializer runtimeDefinitionSerializer = new MultiRuntimeDefinitionSerializer();
+
+ private final REEFFileNames reefFileNames;
+ private final ConfigurationSerializer configurationSerializer;
+ private final AvroMultiRuntimeAppSubmissionParametersSerializer avroMultiRuntimeAppSubmissionParametersSerializer;
+ private final AvroYarnJobSubmissionParametersSerializer avroYarnJobSubmissionParametersSerializer;
+
+ @Inject
+ private MultiRuntimeYarnBootstrapDriverConfigGenerator(final REEFFileNames reefFileNames,
+ final ConfigurationSerializer configurationSerializer,
+ final AvroMultiRuntimeAppSubmissionParametersSerializer
+ avroMultiRuntimeAppSubmissionParameters,
+ final AvroYarnJobSubmissionParametersSerializer
+ avroYarnJobSubmissionParametersSerializer) {
+ this.configurationSerializer = configurationSerializer;
+ this.reefFileNames = reefFileNames;
+ this.avroYarnJobSubmissionParametersSerializer = avroYarnJobSubmissionParametersSerializer;
+ this.avroMultiRuntimeAppSubmissionParametersSerializer = avroMultiRuntimeAppSubmissionParameters;
+ }
+
+ /**
+ * Adds yarn runtime definitions to the builder.
+ * @param yarnJobSubmissionParams Yarn job submission parameters
+ * @param jobSubmissionParameters Generic job submission parameters
+ * @param builder The multi runtime builder
+ */
+ private void addYarnRuntimeDefinition(
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+ final AvroJobSubmissionParameters jobSubmissionParameters,
+ final MultiRuntimeDefinitionBuilder builder) {
+ // create and serialize yarn configuration if defined
+ final Configuration yarnDriverConfiguration =
+ createYarnConfiguration(yarnJobSubmissionParams, jobSubmissionParameters);
+
+ // add yarn runtime to the builder
+ builder.addRuntime(yarnDriverConfiguration, RuntimeIdentifier.RUNTIME_NAME);
+ }
+
+ private Configuration createYarnConfiguration(
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+ final AvroJobSubmissionParameters jobSubmissionParameters) {
+ return YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
+ yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER,
+ jobSubmissionParameters.getJobId().toString())
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
+ ClientRemoteIdentifier.NONE)
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME)
+ .build();
+ }
+
+ /**
+ * Adds yarn runtime definitions to the builder, with a dummy name.
+ * This is needed to initialze yarn runtme that registers with RM but does not allows submitting evaluators
+ * as evaluator submissions submits to Yarn runtime.
+ * @param yarnJobSubmissionParams Yarn job submission parameters
+ * @param jobSubmissionParameters Generic job submission parameters
+ * @param builder The multi runtime builder
+ */
+ private void addDummyYarnRuntimeDefinition(
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+ final AvroJobSubmissionParameters jobSubmissionParameters,
+ final MultiRuntimeDefinitionBuilder builder) {
+ // create and serialize yarn configuration if defined
+ final Configuration yarnDriverConfiguration =
+ createYarnConfiguration(yarnJobSubmissionParams, jobSubmissionParameters);
+ // add yarn runtime to the builder
+ builder.addRuntime(yarnDriverConfiguration, DUMMY_YARN_RUNTIME);
+ }
+ /**
+ * Adds local runtime definitions to the builder.
+ * @param localAppSubmissionParams Local app submission parameters
+ * @param jobSubmissionParameters Generic job submission parameters
+ * @param builder The multi runtime builder
+ */
+ private void addLocalRuntimeDefinition(
+ final AvroLocalAppSubmissionParameters localAppSubmissionParams,
+ final AvroJobSubmissionParameters jobSubmissionParameters,
+ final MultiRuntimeDefinitionBuilder builder) {
+ // create and serialize local configuration if defined
+ final Configuration localModule = LocalDriverConfiguration.CONF
+ .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS,
+ localAppSubmissionParams.getMaxNumberOfConcurrentEvaluators())
+ // ROOT FOLDER will point to the current runtime directory
+ .set(LocalDriverConfiguration.ROOT_FOLDER, ".")
+ .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(LocalDriverConfiguration.JOB_IDENTIFIER,
+ jobSubmissionParameters.getJobId().toString())
+ .set(LocalDriverConfiguration.RUNTIME_NAMES,
+ org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)
+ .build();
+
+ // add local runtime to the builder
+ builder.addRuntime(localModule, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME);
+ }
+
+ private Configuration getMultiRuntimeDriverConfiguration(
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+ final AvroMultiRuntimeAppSubmissionParameters multiruntimeAppSubmissionParams) {
+
+ if (multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters() == null &&
+ multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() == null){
+ throw new IllegalArgumentException("At least on execution runtime has to be provided");
+ }
+
+ // read yarn job submission parameters
+ final AvroJobSubmissionParameters jobSubmissionParameters =
+ yarnJobSubmissionParams.getSharedJobSubmissionParameters();
+
+ // generate multi runtime definition
+ final MultiRuntimeDefinitionBuilder multiRuntimeDefinitionBuilder = new MultiRuntimeDefinitionBuilder();
+
+ if (multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters() != null){
+ addLocalRuntimeDefinition(
+ multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters(),
+ jobSubmissionParameters, multiRuntimeDefinitionBuilder);
+ }
+
+ if (multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() != null){
+ addYarnRuntimeDefinition(
+ yarnJobSubmissionParams,
+ jobSubmissionParameters,
+ multiRuntimeDefinitionBuilder);
+ } else {
+ addDummyYarnRuntimeDefinition(
+ yarnJobSubmissionParams,
+ jobSubmissionParameters,
+ multiRuntimeDefinitionBuilder);
+ }
+
+ multiRuntimeDefinitionBuilder.setDefaultRuntimeName(
+ multiruntimeAppSubmissionParams.getDefaultRuntimeName().toString());
+
+ // generate multi runtime configuration
+ ConfigurationModule multiRuntimeDriverConfiguration = MultiRuntimeDriverConfiguration.CONF
+ .set(MultiRuntimeDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString())
+ .set(MultiRuntimeDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(MultiRuntimeDriverConfiguration.SERIALIZED_RUNTIME_DEFINITION,
+ this.runtimeDefinitionSerializer.toString(multiRuntimeDefinitionBuilder.build()));
+
+ for (final CharSequence runtimeName : multiruntimeAppSubmissionParams.getRuntimes()){
+ multiRuntimeDriverConfiguration = multiRuntimeDriverConfiguration.set(
+ MultiRuntimeDriverConfiguration.RUNTIME_NAMES, runtimeName.toString());
+ }
+
+ final AvroAppSubmissionParameters appSubmissionParams =
+ multiruntimeAppSubmissionParams.getSharedAppSubmissionParameters();
+
+ // generate yarn related driver configuration
+ final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class)
+ .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(appSubmissionParams.getTcpBeginPort()))
+ .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(appSubmissionParams.getTcpRangeCount()))
+ .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount()))
+ .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
+ yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .build();
+
+ final Configuration driverConfiguration = Configurations.merge(
+ Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
+ multiRuntimeDriverConfiguration.build(),
+ providerConfig);
+
+ // add restart configuration if needed
+ if (multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() != null &&
+ multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() > 0) {
+ LOG.log(Level.FINE, "Driver restart is enabled.");
+
+ final Configuration yarnDriverRestartConfiguration =
+ YarnDriverRestartConfiguration.CONF.build();
+
+ final Configuration driverRestartConfiguration =
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+ multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters().getDriverRecoveryTimeout())
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+ JobDriver.DriverRestartCompletedHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+ JobDriver.DriverRestartFailedEvaluatorHandler.class)
+ .build();
+
+ return Configurations.merge(driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
+ }
+
+ return driverConfiguration;
+ }
+
+
+ /**
+ * Writes the driver configuration files to the provided location.
+ * @param bootstrapJobArgsLocation The path for the job args file
+ * @param bootstrapAppArgsLocation The path for the app args file
+ * @throws IOException
+ * @return A path to the written driver configuration
+ */
+ String writeDriverConfigurationFile(final String bootstrapJobArgsLocation,
+ final String bootstrapAppArgsLocation) throws IOException {
+ final File bootstrapJobArgsFile = new File(bootstrapJobArgsLocation).getCanonicalFile();
+ final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation);
+
+ final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs =
+ this.avroYarnJobSubmissionParametersSerializer.fromFile(bootstrapJobArgsFile);
+
+ final AvroMultiRuntimeAppSubmissionParameters multiruntimeBootstrapAppArgs =
+ this.avroMultiRuntimeAppSubmissionParametersSerializer.fromFile(bootstrapAppArgsFile);
+
+ final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
+
+ this.configurationSerializer.toFile(
+ getMultiRuntimeDriverConfiguration(
+ yarnBootstrapJobArgs, multiruntimeBootstrapAppArgs),
+ new File(driverConfigPath));
+
+ return driverConfigPath;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java
new file mode 100644
index 0000000..6e96439
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Interop;
+import org.apache.reef.runtime.common.REEFLauncher;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is a bootstrap launcher for YARN for submission of multiruntime jobs from C#. It allows for Java Driver
+ * configuration generation directly on the Driver without need of Java dependency if REST
+ * submission is used. Note that the name of the class must contain "REEFLauncher" for the time
+ * being in order for the Interop code to discover the class.
+ */
+// TODO[JIRA REEF-1382]: This class does things both for client and driver need to split it
+@Unstable
+@Interop(CppFiles = "DriverLauncher.cpp")
+public final class MultiRuntimeYarnBootstrapREEFLauncher {
+ private static final Logger LOG = Logger.getLogger(MultiRuntimeYarnBootstrapREEFLauncher.class.getName());
+
+ public static void main(final String[] args) throws IOException, InjectionException {
+ LOG.log(Level.INFO, "Entering BootstrapLauncher.main().");
+ if (args.length != 2) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("[ ");
+ for (String arg : args) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+
+ sb.append("]");
+ final String message = "Bootstrap launcher should have two configuration file inputs, one specifying the" +
+ " application submission parameters to be deserialized and the other specifying the job" +
+ " submission parameters to be deserialized to create the YarnDriverConfiguration on the fly." +
+ " Current args are " + sb.toString();
+
+ throw fatal(message, new IllegalArgumentException(message));
+ }
+
+ try {
+ final MultiRuntimeYarnBootstrapDriverConfigGenerator driverConfigurationGenerator =
+ Tang.Factory.getTang().newInjector().getInstance(MultiRuntimeYarnBootstrapDriverConfigGenerator.class);
+ REEFLauncher.main(new String[]{driverConfigurationGenerator.writeDriverConfigurationFile(args[0], args[1])});
+ } catch (final Exception exception) {
+ if (!(exception instanceof RuntimeException)) {
+ throw fatal("Failed to initialize configurations.", exception);
+ }
+
+ throw exception;
+ }
+ }
+
+ private static RuntimeException fatal(final String msg, final Throwable t) {
+ LOG.log(Level.SEVERE, msg, t);
+ return new RuntimeException(msg, t);
+ }
+
+ private MultiRuntimeYarnBootstrapREEFLauncher(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
index afa49be..490ed2e 100644
--- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -18,10 +18,7 @@
*/
package org.apache.reef.bridge.client;
-import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.*;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
@@ -36,6 +33,8 @@ import org.junit.Test;
import java.io.*;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
/**
* Tests for generating Driver configuration by bootstrapping the process to the
@@ -80,6 +79,89 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
"\"driverRecoveryTimeout\":" + NUMBER_REP +
"}";
+ private static final String AVRO_YARN_MULTIRUNTIME_APP_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"localRuntimeAppParameters\":" +
+ "{\"org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters\":" +
+ "{\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"maxNumberOfConcurrentEvaluators\":" + NUMBER_REP +
+ "}" +
+ "}," +
+ "\"yarnRuntimeAppParameters\":" +
+ "{\"org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters\":" +
+ "{\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"driverRecoveryTimeout\":" + NUMBER_REP +
+ "}" +
+ "}," +
+ "\"defaultRuntimeName\":\"Local\"" + "," +
+ "\"runtimes\":[\"Local\", \"Yarn\" ]" +
+ "}";
+
+ private static final String AVRO_YARN_MULTIRUNTIME_LOCALONLY_APP_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"localRuntimeAppParameters\":" +
+ "{\"org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters\":" +
+ "{\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"maxNumberOfConcurrentEvaluators\":" + NUMBER_REP +
+ "}" +
+ "}," +
+ "\"yarnRuntimeAppParameters\":null," +
+ "\"defaultRuntimeName\":\"Local\"" + "," +
+ "\"runtimes\":[\"Local\" ]" +
+ "}";
+
+
+ private static final String AVRO_YARN_MULTIRUNTIME_YARNONLY_APP_PARAMETERS_SERIALIZED_STRING =
+ "{" +
+ "\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"localRuntimeAppParameters\":null," +
+ "\"yarnRuntimeAppParameters\":" +
+ "{\"org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters\":" +
+ "{\"sharedAppSubmissionParameters\":" +
+ "{" +
+ "\"tcpBeginPort\":" + NUMBER_REP + "," +
+ "\"tcpRangeCount\":" + NUMBER_REP + "," +
+ "\"tcpTryCount\":" + NUMBER_REP +
+ "}," +
+ "\"driverRecoveryTimeout\":" + NUMBER_REP +
+ "}" +
+ "}," +
+ "\"defaultRuntimeName\":\"Yarn\"" + "," +
+ "\"runtimes\":[\"Yarn\" ]" +
+ "}";
+
/**
* Tests deserialization of the Avro parameters for submission from the cluster from C#.
* @throws IOException
@@ -111,6 +193,39 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
}
/**
+ * Tests deserialization of the Avro parameters for multiruntime from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroMultiruntimeParametersDeserialization() throws IOException, InjectionException {
+ verifyYarnMultiRuntimeJobSubmissionParams(
+ createAvroYarnJobSubmissionParameters(),
+ createAvroMultiruntimeAppSubmissionParameters());
+ }
+
+ /**
+ * Tests deserialization of the Avro parameters for multiruntime from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroMultiruntimeYarnOnlyParametersDeserialization() throws IOException, InjectionException {
+ verifyYarnOnlyMultiRuntimeJobSubmissionParams(
+ createAvroYarnJobSubmissionParameters(),
+ createAvroMultiruntimeYarnOnlyAppSubmissionParameters());
+ }
+
+ /**
+ * Tests deserialization of the Avro parameters for multiruntime from C#.
+ * @throws IOException
+ */
+ @Test
+ public void testAvroMultiruntimeLocalOnlyParametersDeserialization() throws IOException, InjectionException {
+ verifyLocalOnlyMultiRuntimeJobSubmissionParams(
+ createAvroYarnJobSubmissionParameters(),
+ createAvroMultiruntimeLocalOnlyAppSubmissionParameters());
+ }
+
+ /**
* Tests a round-trip serialization deserialization process of the Avro parameters from C#.
* @throws IOException
*/
@@ -165,6 +280,38 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
}
}
+ private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeAppSubmissionParameters()
+ throws IOException, InjectionException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(
+ AVRO_YARN_MULTIRUNTIME_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
+ return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class)
+ .fromInputStream(stream);
+ }
+ }
+
+ private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeYarnOnlyAppSubmissionParameters()
+ throws IOException, InjectionException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(
+ AVRO_YARN_MULTIRUNTIME_YARNONLY_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets
+ .UTF_8))) {
+ return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class)
+ .fromInputStream(stream);
+ }
+ }
+
+ private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeLocalOnlyAppSubmissionParameters()
+ throws IOException, InjectionException {
+ try (final InputStream stream =
+ new ByteArrayInputStream(
+ AVRO_YARN_MULTIRUNTIME_LOCALONLY_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets
+ .UTF_8))) {
+ return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class)
+ .fromInputStream(stream);
+ }
+ }
+
private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException {
try (final InputStream stream =
new ByteArrayInputStream(AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
@@ -200,4 +347,101 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
}
+
+ private static void verifyYarnMultiRuntimeJobSubmissionParams(
+ final AvroYarnJobSubmissionParameters jobSubmissionParameters,
+ final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) {
+ final AvroAppSubmissionParameters sharedAppSubmissionParams =
+ appSubmissionParameters.getSharedAppSubmissionParameters();
+
+ final AvroJobSubmissionParameters sharedJobSubmissionParams =
+ jobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP;
+ assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
+ assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
+ assert appSubmissionParameters.getLocalRuntimeAppParameters() != null;
+ assert appSubmissionParameters.
+ getLocalRuntimeAppParameters().getMaxNumberOfConcurrentEvaluators() == NUMBER_REP;
+ assert appSubmissionParameters.getYarnRuntimeAppParameters() != null;
+ assert appSubmissionParameters.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() == NUMBER_REP;
+ assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Local");
+ assert appSubmissionParameters.getRuntimes().size() == 2;
+
+ List<String> lst = new ArrayList<>();
+ // create list of string
+ for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){
+ lst.add(charSeq.toString());
+ }
+
+ assert lst.contains("Local");
+ assert lst.contains("Yarn");
+ }
+
+ private static void verifyYarnOnlyMultiRuntimeJobSubmissionParams(
+ final AvroYarnJobSubmissionParameters jobSubmissionParameters,
+ final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) {
+ final AvroAppSubmissionParameters sharedAppSubmissionParams =
+ appSubmissionParameters.getSharedAppSubmissionParameters();
+
+ final AvroJobSubmissionParameters sharedJobSubmissionParams =
+ jobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP;
+ assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
+ assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
+ assert appSubmissionParameters.getLocalRuntimeAppParameters() == null;
+ assert appSubmissionParameters.getYarnRuntimeAppParameters() != null;
+ assert appSubmissionParameters.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() == NUMBER_REP;
+ assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Yarn");
+ assert appSubmissionParameters.getRuntimes().size() == 1;
+
+ List<String> lst = new ArrayList<>();
+ // create list of string
+ for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){
+ lst.add(charSeq.toString());
+ }
+
+ assert lst.contains("Yarn");
+ }
+
+ private static void verifyLocalOnlyMultiRuntimeJobSubmissionParams(
+ final AvroYarnJobSubmissionParameters jobSubmissionParameters,
+ final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) {
+ final AvroAppSubmissionParameters sharedAppSubmissionParams =
+ appSubmissionParameters.getSharedAppSubmissionParameters();
+
+ final AvroJobSubmissionParameters sharedJobSubmissionParams =
+ jobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP;
+ assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP;
+ assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP);
+ assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP);
+ assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP);
+ assert appSubmissionParameters.getLocalRuntimeAppParameters() != null;
+ assert appSubmissionParameters.
+ getLocalRuntimeAppParameters().getMaxNumberOfConcurrentEvaluators() == NUMBER_REP;
+ assert appSubmissionParameters.getYarnRuntimeAppParameters() == null;
+ assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Local");
+ assert appSubmissionParameters.getRuntimes().size() == 1;
+
+ List<String> lst = new ArrayList<>();
+ // create list of string
+ for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){
+ lst.add(charSeq.toString());
+ }
+
+ assert lst.contains("Local");
+ }
}