You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/01/12 06:47:40 UTC
incubator-reef git commit: [REEF-55] Delayed creation and upload of
the global.jar file until the first Evaluator submission on YARN.
Repository: incubator-reef
Updated Branches:
refs/heads/master 13cf52250 -> c8db8f8ff
[REEF-55] Delayed creation and upload of the global.jar file until
the first Evaluator submission on YARN.
* Added the `GlobalJarUploader` class which performs the creation
and upload. It is a `Callable` such that we can move this to a
thread in a future version
* Moved the actual upload logic from `EvaluatorSetupHelper` to
`UploaderToJobFolder`
JIRA: [REEF-55](https://issues.apache.org/jira/browse/REEF-55)
Pull Request: Closes #28
Author: Markus Weimer <we...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c8db8f8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c8db8f8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c8db8f8f
Branch: refs/heads/master
Commit: c8db8f8ff72ae5e285938f40e1a6ff6feb0866a6
Parents: 13cf522
Author: Markus Weimer <we...@apache.org>
Authored: Mon Jan 12 14:42:28 2015 +0900
Committer: Brian Cho <ch...@apache.org>
Committed: Mon Jan 12 14:45:25 2015 +0900
----------------------------------------------------------------------
.../yarn/driver/EvaluatorSetupHelper.java | 98 +++++---------------
.../runtime/yarn/driver/GlobalJarUploader.java | 92 ++++++++++++++++++
.../yarn/driver/UploaderToJobfolder.java | 94 +++++++++++++++++++
3 files changed, 209 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
index 55a7855..3a2ff5c 100644
--- a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
@@ -18,16 +18,8 @@
*/
package org.apache.reef.runtime.yarn.driver;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.io.TempFileCreator;
import org.apache.reef.io.WorkingDirectoryTempFileCreator;
@@ -35,7 +27,6 @@ import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
@@ -58,49 +49,41 @@ final class EvaluatorSetupHelper {
private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName());
- private final String jobSubmissionDirectory;
- private final Map<String, LocalResource> globalResources;
private final REEFFileNames fileNames;
private final ConfigurationSerializer configurationSerializer;
- private final FileSystem fileSystem;
private final TempFileCreator tempFileCreator;
+ private final UploaderToJobFolder uploader;
+ private final GlobalJarUploader globalJarUploader;
private final boolean deleteTempFiles;
@Inject
EvaluatorSetupHelper(
- final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
- final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ConfigurationSerializer configurationSerializer,
final TempFileCreator tempFileCreator,
- final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles) throws IOException {
+ final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles,
+ final UploaderToJobFolder uploader,
+ final GlobalJarUploader globalJarUploader) throws IOException {
this.tempFileCreator = tempFileCreator;
this.deleteTempFiles = deleteTempFiles;
+ this.globalJarUploader = globalJarUploader;
- this.fileSystem = FileSystem.get(yarnConfiguration);
- this.jobSubmissionDirectory = jobSubmissionDirectory;
this.fileNames = fileNames;
this.configurationSerializer = configurationSerializer;
- this.globalResources = this.setup();
+ this.uploader = uploader;
}
- public Map<String, LocalResource> getGlobalResources() {
- return this.globalResources;
+ /**
+ * @return the map to be used in formulating the evaluator launch submission.
+ */
+ Map<String, LocalResource> getGlobalResources() {
+ try {
+ return this.globalJarUploader.call();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e);
+ }
}
- private Map<String, LocalResource> setup() throws IOException {
- final Map<String, LocalResource> result = new HashMap<>(1);
- final Path pathToGlobalJar = this.uploadToJobFolder(makeGlobalJar());
- result.put(this.fileNames.getGlobalFolderPath(), makeLocalResourceForJarFile(pathToGlobalJar));
- return result;
- }
-
- private File makeGlobalJar() throws IOException {
- final File jarFile = new File(
- this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
- new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
- return jarFile;
- }
/**
* Sets up the LocalResources for a new Evaluator.
@@ -109,7 +92,7 @@ final class EvaluatorSetupHelper {
* @return
* @throws IOException
*/
- public Map<String, LocalResource> getResources(
+ Map<String, LocalResource> getResources(
final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
throws IOException {
@@ -119,10 +102,8 @@ final class EvaluatorSetupHelper {
final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix());
// Write the configuration
- final File configurationFile = new File(
- localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
- this.configurationSerializer.toFile(
- makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
+ final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+ this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
// Copy files to the staging folder
JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
@@ -133,8 +114,8 @@ final class EvaluatorSetupHelper {
new JARFileMaker(localFile).addChildren(localStagingFolder).close();
// Upload the JAR to the job folder
- final Path pathToEvaluatorJar = uploadToJobFolder(localFile);
- result.put(this.fileNames.getLocalFolderPath(), makeLocalResourceForJarFile(pathToEvaluatorJar));
+ final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile);
+ result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar));
if (this.deleteTempFiles) {
LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]",
@@ -156,44 +137,11 @@ final class EvaluatorSetupHelper {
* @throws IOException
*/
- private Configuration makeEvaluatorConfiguration(
- final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) throws IOException {
+ private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+ throws IOException {
return Tang.Factory.getTang()
.newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
.bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
.build();
}
-
- /**
- * Uploads the given file to the job folder on (H)DFS.
- *
- * @param file
- * @return
- * @throws IOException
- */
- private Path uploadToJobFolder(final File file) throws IOException {
- final Path source = new Path(file.getAbsolutePath());
- final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
- LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
- this.fileSystem.copyFromLocalFile(false, true, source, destination);
- return destination;
- }
-
- /**
- * Creates a LocalResource instance for the JAR file referenced by the given Path
- *
- * @param path
- * @return
- * @throws IOException
- */
- private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
- final LocalResource localResource = Records.newRecord(LocalResource.class);
- final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
- localResource.setType(LocalResourceType.ARCHIVE);
- localResource.setVisibility(LocalResourceVisibility.APPLICATION);
- localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
- localResource.setTimestamp(status.getModificationTime());
- localResource.setSize(status.getLen());
- return localResource;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
new file mode 100644
index 0000000..c6ee91e
--- /dev/null
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Utility class that creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ */
+final class GlobalJarUploader implements Callable<Map<String, LocalResource>> {
+
+ /**
+ * Used for the file system constants.
+ */
+ private final REEFFileNames fileNames;
+ /**
+ * This will hold the actuall map to be used as the "global" resources when submitting Evaluators.
+ */
+ private final Map<String, LocalResource> globalResources = new HashMap<>(1);
+ /**
+ * Utility to actually perform the update.
+ */
+ private final UploaderToJobFolder uploader;
+ /**
+ * True, if globalResources contains the valid information which is cached after the first call to call().
+ */
+ private boolean isDone;
+
+ @Inject
+ GlobalJarUploader(final REEFFileNames fileNames,
+ final UploaderToJobFolder uploader) {
+ this.fileNames = fileNames;
+ this.uploader = uploader;
+ }
+
+ /**
+ * Creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ *
+ * @return the map to be used as the "global" resources when submitting Evaluators.
+ * @throws IOException if the creation of the JAR or the upload fails
+ */
+ @Override
+ public synchronized Map<String, LocalResource> call() throws IOException {
+ if (!this.isDone) {
+ final Path pathToGlobalJar = this.uploader.uploadToJobFolder(makeGlobalJar());
+ globalResources.put(this.fileNames.getGlobalFolderPath(),
+ this.uploader.makeLocalResourceForJarFile(pathToGlobalJar));
+ this.isDone = true;
+ }
+ return this.globalResources;
+ }
+
+ /**
+ * Creates the JAR file for upload.
+ *
+ * @return
+ * @throws IOException
+ */
+ private File makeGlobalJar() throws IOException {
+ final File jarFile = new File(this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
+ new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
+ return jarFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
new file mode 100644
index 0000000..afc1d9c
--- /dev/null
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Uploads files to the current job folder.
+ */
+final class UploaderToJobFolder {
+ private static final Logger LOG = Logger.getLogger(UploaderToJobFolder.class.getName());
+
+ /**
+ * The path on (H)DFS which is used as the job's folder.
+ */
+ private final String jobSubmissionDirectory;
+ /**
+ * The FileSystem instance to use for fs operations.
+ */
+ private final FileSystem fileSystem;
+
+ @Inject
+ UploaderToJobFolder(final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
+ final YarnConfiguration yarnConfiguration) throws IOException {
+ this.jobSubmissionDirectory = jobSubmissionDirectory;
+ this.fileSystem = FileSystem.get(yarnConfiguration);
+ }
+
+ /**
+ * Uploads the given file to the job folder on (H)DFS.
+ *
+ * @param file
+ * @return
+ * @throws java.io.IOException
+ */
+ Path uploadToJobFolder(final File file) throws IOException {
+ final Path source = new Path(file.getAbsolutePath());
+ final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
+ LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+ this.fileSystem.copyFromLocalFile(false, true, source, destination);
+ return destination;
+ }
+
+ /**
+ * Creates a LocalResource instance for the JAR file referenced by the given Path
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+ final LocalResource localResource = Records.newRecord(LocalResource.class);
+ final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
+ localResource.setType(LocalResourceType.ARCHIVE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+ localResource.setTimestamp(status.getModificationTime());
+ localResource.setSize(status.getLen());
+ return localResource;
+ }
+}