You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/10/27 23:05:53 UTC
incubator-reef git commit: [REEF-866] Refactor bridge clients to
separate driver config generation from application submission
Repository: incubator-reef
Updated Branches:
refs/heads/master db0462047 -> ebbbeacc8
[REEF-866] Refactor bridge clients to separate driver config generation from application submission
JIRA:
[REEF-866](https://issues.apache.org/jira/browse/REEF-866)
Pull Request:
Closes #585
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ebbbeacc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ebbbeacc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ebbbeacc
Branch: refs/heads/master
Commit: ebbbeacc862e205c3ee05c41f22d061d0b9cd57e
Parents: db04620
Author: Anupam <an...@gmail.com>
Authored: Thu Oct 22 19:19:40 2015 -0700
Committer: Andrew Chung <af...@gmail.com>
Committed: Tue Oct 27 15:03:25 2015 -0700
----------------------------------------------------------------------
.../apache/reef/bridge/client/LocalClient.java | 47 +-----
...ocalRuntimeDriverConfigurationGenerator.java | 104 ++++++++++++
.../YarnDriverConfigurationGenerator.java | 160 +++++++++++++++++++
.../bridge/client/YarnJobSubmissionClient.java | 112 ++-----------
.../bridge/client/YarnSubmissionFromCS.java | 2 +-
5 files changed, 290 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index 7a08ee2..e80164b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -18,21 +18,15 @@
*/
package org.apache.reef.bridge.client;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
-import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -43,24 +37,14 @@ public final class LocalClient {
private static final Logger LOG = Logger.getLogger(LocalClient.class.getName());
private static final String CLIENT_REMOTE_ID = ClientRemoteIdentifier.NONE;
- private final AvroConfigurationSerializer configurationSerializer;
private final PreparedDriverFolderLauncher launcher;
- private final REEFFileNames fileNames;
- private final DriverConfigurationProvider driverConfigurationProvider;
- private final Set<ConfigurationProvider> configurationProviders;
+ private final LocalRuntimeDriverConfigurationGenerator configurationGenerator;
@Inject
- private LocalClient(final AvroConfigurationSerializer configurationSerializer,
- final PreparedDriverFolderLauncher launcher,
- final REEFFileNames fileNames,
- final DriverConfigurationProvider driverConfigurationProvider,
- @Parameter(DriverConfigurationProviders.class)
- final Set<ConfigurationProvider> configurationProviders) {
- this.configurationSerializer = configurationSerializer;
+ private LocalClient(final PreparedDriverFolderLauncher launcher,
+ final LocalRuntimeDriverConfigurationGenerator configurationGenerator) {
this.launcher = launcher;
- this.fileNames = fileNames;
- this.driverConfigurationProvider = driverConfigurationProvider;
- this.configurationProviders = configurationProviders;
+ this.configurationGenerator = configurationGenerator;
}
private void submit(final LocalSubmissionFromCS localSubmissionFromCS) throws IOException {
@@ -70,23 +54,8 @@ public final class LocalClient {
throw new IOException("The Driver folder " + driverFolder.getAbsolutePath() + " doesn't exist.");
}
- final Configuration driverConfiguration1 = driverConfigurationProvider
- .getDriverConfiguration(localSubmissionFromCS.getJobFolder(), CLIENT_REMOTE_ID,
- localSubmissionFromCS.getJobId(), Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
- final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
- for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
- configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
- }
- final Configuration providedConfigurations = configurationBuilder.build();
- final Configuration driverConfiguration = Configurations.merge(
- driverConfiguration1,
- Tang.Factory.getTang()
- .newConfigurationBuilder()
- .bindNamedParameter(JobSubmissionDirectory.class, driverFolder.toString())
- .build(),
- providedConfigurations);
- final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath());
- configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+ configurationGenerator.writeConfiguration(localSubmissionFromCS.getJobFolder(),
+ localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID);
launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
new file mode 100644
index 0000000..6d8f931
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for local runtime.
+ */
+final class LocalRuntimeDriverConfigurationGenerator {
+ private static final Logger LOG = Logger.getLogger(LocalRuntimeDriverConfigurationGenerator.class.getName());
+ private final REEFFileNames fileNames;
+ private final DriverConfigurationProvider driverConfigurationProvider;
+ private final Set<ConfigurationProvider> configurationProviders;
+ private final AvroConfigurationSerializer configurationSerializer;
+
+ @Inject
+ private LocalRuntimeDriverConfigurationGenerator(final AvroConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames,
+ final DriverConfigurationProvider driverConfigurationProvider,
+ @Parameter(DriverConfigurationProviders.class)
+ final Set<ConfigurationProvider> configurationProviders){
+ this.fileNames = fileNames;
+ this.driverConfigurationProvider = driverConfigurationProvider;
+ this.configurationProviders = configurationProviders;
+ this.configurationSerializer = configurationSerializer;
+ }
+
+ /**
+ * Writes driver configuration to disk.
+ * @param jobFolder The folder in which the job is staged.
+ * @param jobId id of the job to be submitted
+ * @param clientRemoteId
+ * @return
+ * @throws IOException
+ */
+ public Configuration writeConfiguration(final File jobFolder,
+ final String jobId,
+ final String clientRemoteId) throws IOException {
+ final File driverFolder = new File(jobFolder, PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
+
+ final Configuration driverConfiguration1 = driverConfigurationProvider
+ .getDriverConfiguration(jobFolder, clientRemoteId,
+ jobId, Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
+ final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
+ configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
+ }
+ final Configuration providedConfigurations = configurationBuilder.build();
+ final Configuration driverConfiguration = Configurations.merge(
+ driverConfiguration1,
+ Tang.Factory.getTang()
+ .newConfigurationBuilder()
+ .bindNamedParameter(JobSubmissionDirectory.class, driverFolder.toString())
+ .build(),
+ providedConfigurations);
+ final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath());
+ configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+ return driverConfiguration;
+ }
+
+ public static void main(final String[] args) throws InjectionException, IOException {
+ final LocalSubmissionFromCS localSubmission = LocalSubmissionFromCS.fromCommandLine(args);
+ LOG.log(Level.INFO, "Local driver config generation received from C#: {0}", localSubmission);
+ final Configuration localRuntimeConfiguration = localSubmission.getRuntimeConfiguration();
+ final LocalRuntimeDriverConfigurationGenerator localConfigurationGenerator = Tang.Factory.getTang()
+ .newInjector(localRuntimeConfiguration)
+ .getInstance(LocalRuntimeDriverConfigurationGenerator.class);
+ localConfigurationGenerator.writeConfiguration(localSubmission.getJobFolder(),
+ localSubmission.getJobId(), ClientRemoteIdentifier.NONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
new file mode 100644
index 0000000..7206a50
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnDriverConfigurationGenerator {
+ private static final Logger LOG = Logger.getLogger(YarnDriverConfigurationGenerator.class.getName());
+ private final Set<ConfigurationProvider> configurationProviders;
+ private final int driverRestartEvaluatorRecoverySeconds;
+ private final REEFFileNames fileNames;
+ private final ConfigurationSerializer configurationSerializer;
+
+ @Inject
+ private YarnDriverConfigurationGenerator(@Parameter(DriverConfigurationProviders.class)
+ final Set<ConfigurationProvider> configurationProviders,
+ @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
+ final int driverRestartEvaluatorRecoverySeconds,
+ final ConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames) {
+ this.configurationProviders = configurationProviders;
+ this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
+ this.fileNames = fileNames;
+ this.configurationSerializer = configurationSerializer;
+ }
+
+ /**
+ * Writes driver configuration to disk.
+ * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
+ * @param jobId id of the job to be submitted
+ * @param jobSubmissionFolder job submission folder on DFS
+ * @return the prepared driver configuration
+ * @throws IOException
+ */
+ public Configuration writeConfiguration(final File driverFolder,
+ final String jobId,
+ final String jobSubmissionFolder) throws IOException {
+ final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+ final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .build();
+
+ final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
+ configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
+ }
+ final Configuration providedConfigurations = configurationBuilder.build();
+
+ Configuration driverConfiguration = Configurations.merge(
+ Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
+ yarnDriverConfiguration,
+ providedConfigurations);
+
+ if (driverRestartEvaluatorRecoverySeconds > 0) {
+ LOG.log(Level.FINE, "Driver restart is enabled.");
+
+ final Configuration yarnDriverRestartConfiguration =
+ YarnDriverRestartConfiguration.CONF.build();
+
+ final Configuration driverRestartConfiguration =
+ DriverRestartConfiguration.CONF
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+ JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+ JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
+ driverRestartEvaluatorRecoverySeconds)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+ JobDriver.DriverRestartCompletedHandler.class)
+ .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+ JobDriver.DriverRestartFailedEvaluatorHandler.class)
+ .build();
+
+ driverConfiguration = Configurations.merge(
+ driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
+ }
+
+ this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+ return driverConfiguration;
+ }
+
+ /**
+ * This main is executed from .NET to perform driver config generation.
+ * For arguments detail:
+ * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
+ */
+ public static void main(final String[] args) throws InjectionException, IOException {
+ final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
+ LOG.log(Level.INFO, "YARN driver config generation received from C#: {0}", yarnSubmission);
+ final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
+
+ final Injector injector = Tang.Factory.getTang().newInjector(yarnConfiguration);
+ final YarnDriverConfigurationGenerator yarnClientConfigurationGenerator =
+ injector.getInstance(YarnDriverConfigurationGenerator.class);
+ final JobSubmissionDirectoryProvider directoryProvider = injector.getInstance(JobSubmissionDirectoryProvider.class);
+ final String jobId = yarnSubmission.getJobId();
+
+ LOG.log(Level.INFO, "Writing driver config for job {0}", jobId);
+ yarnClientConfigurationGenerator.writeConfiguration(
+ yarnSubmission.getDriverFolder(), jobId, directoryProvider.getJobSubmissionDirectoryPath(jobId).toString());
+ System.exit(0);
+ LOG.log(Level.INFO, "End of main in Java YarnDriverConfigurationGenerator");
+ }
+}
+
+/**
+ * How long the driver should wait before timing out on evaluator
+ * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
+ * enabled. Only used by .NET job submission.
+ */
+@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
+ " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
+ " enabled. Only used by .NET job submission.", default_value = "-1")
+final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
+ private SubmissionDriverRestartEvaluatorRecoverySeconds() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 53be5dd..843f677 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -18,19 +18,17 @@
*/
package org.apache.reef.bridge.client;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.client.DriverRestartConfiguration;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
-import org.apache.reef.javabridge.generic.JobDriver;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
@@ -38,22 +36,20 @@ import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
-import org.apache.reef.tang.*;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.JARFileMaker;
+
import javax.inject.Inject;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
-import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -69,11 +65,10 @@ public final class YarnJobSubmissionClient {
private final REEFFileNames fileNames;
private final YarnConfiguration yarnConfiguration;
private final ClasspathProvider classpath;
- private final Set<ConfigurationProvider> configurationProviders;
private final int maxApplicationSubmissions;
- private final int driverRestartEvaluatorRecoverySeconds;
private final SecurityTokenProvider tokenProvider;
private final List<String> commandPrefixList;
+ private final YarnDriverConfigurationGenerator configurationGenerator;
@Inject
YarnJobSubmissionClient(final JobUploader uploader,
@@ -81,78 +76,21 @@ public final class YarnJobSubmissionClient {
final ConfigurationSerializer configurationSerializer,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- @Parameter(DriverConfigurationProviders.class)
- final Set<ConfigurationProvider> configurationProviders,
@Parameter(MaxApplicationSubmissions.class)
final int maxApplicationSubmissions,
@Parameter(DriverLaunchCommandPrefix.class)
final List<String> commandPrefixList,
- @Parameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class)
- final int driverRestartEvaluatorRecoverySeconds,
- final SecurityTokenProvider tokenProvider) {
+ final SecurityTokenProvider tokenProvider,
+ final YarnDriverConfigurationGenerator configurationGenerator) {
this.uploader = uploader;
this.configurationSerializer = configurationSerializer;
this.fileNames = fileNames;
this.yarnConfiguration = yarnConfiguration;
this.classpath = classpath;
- this.configurationProviders = configurationProviders;
this.maxApplicationSubmissions = maxApplicationSubmissions;
- this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
- }
-
- private Configuration addYarnDriverConfiguration(final File driverFolder,
- final String jobId,
- final String jobSubmissionFolder)
- throws IOException {
- final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
- final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
- .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
- .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
- .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
- .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
- .build();
-
- final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
- for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
- configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
- }
- final Configuration providedConfigurations = configurationBuilder.build();
-
- Configuration driverConfiguration = Configurations.merge(
- Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
- yarnDriverConfiguration,
- providedConfigurations);
-
- if (driverRestartEvaluatorRecoverySeconds > 0) {
- LOG.log(Level.FINE, "Driver restart is enabled.");
-
- final Configuration yarnDriverRestartConfiguration =
- YarnDriverRestartConfiguration.CONF
- .build();
-
- final Configuration driverRestartConfiguration =
- DriverRestartConfiguration.CONF
- .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
- JobDriver.DriverRestartActiveContextHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
- JobDriver.DriverRestartRunningTaskHandler.class)
- .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
- driverRestartEvaluatorRecoverySeconds)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
- JobDriver.DriverRestartCompletedHandler.class)
- .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
- JobDriver.DriverRestartFailedEvaluatorHandler.class)
- .build();
-
- driverConfiguration = Configurations.merge(
- driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration);
- }
-
- this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
- return driverConfiguration;
+ this.configurationGenerator = configurationGenerator;
}
/**
@@ -181,8 +119,9 @@ public final class YarnJobSubmissionClient {
// Prepare the JAR
final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
final Configuration jobSubmissionConfiguration =
- this.addYarnDriverConfiguration(yarnSubmission.getDriverFolder(), yarnSubmission.getJobId(),
- jobFolderOnDFS.getPath().toString());
+ this.configurationGenerator.writeConfiguration(yarnSubmission.getDriverFolder(),
+ yarnSubmission.getJobId(),
+ jobFolderOnDFS.getPath().toString());
final File jarFile = makeJar(yarnSubmission.getDriverFolder());
LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
@@ -288,13 +227,9 @@ public final class YarnJobSubmissionClient {
}
/**
- * Takes 5 parameters from the C# side:
- * [0]: String. Driver folder.
- * [1]: String. Driver identifier.
- * [2]: int. Driver memory.
- * [3~5]: int. TCP configurations.
- * [6]: int. Max application submissions.
- * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart is enabled.
+ * .NET client calls into this main method for job submission.
+ * For arguments detail:
+ * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
*/
public static void main(final String[] args) throws InjectionException, IOException, YarnException {
final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
@@ -318,16 +253,3 @@ public final class YarnJobSubmissionClient {
LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
}
}
-
-/**
- * How long the driver should wait before timing out on evaluator
- * recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be
- * enabled. Only used by .NET job submission.
- */
-@NamedParameter(doc = "How long the driver should wait before timing out on evaluator" +
- " recovery in seconds. Defaults to -1. If value is negative, the restart functionality will not be" +
- " enabled. Only used by .NET job submission.", default_value = "-1")
-final class SubmissionDriverRestartEvaluatorRecoverySeconds implements Name<Integer> {
- private SubmissionDriverRestartEvaluatorRecoverySeconds() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ebbbeacc/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
index 2ab6c9b..9b4c35b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
@@ -197,7 +197,7 @@ final class YarnSubmissionFromCS {
}
/**
- * Takes 5 parameters from the C# side:
+ * Takes 9 parameters from the C# side:
* [0]: String. Driver folder.
* [1]: String. Driver identifier.
* [2]: int. Driver memory.