You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/01/12 06:47:40 UTC

incubator-reef git commit: [REEF-55] Delayed creation and upload of the global.jar file until the first Evaluator submission on YARN.

Repository: incubator-reef
Updated Branches:
  refs/heads/master 13cf52250 -> c8db8f8ff


[REEF-55] Delayed creation and upload of the global.jar file until
  the first Evaluator submission on YARN.

  * Added the `GlobalJarUploader` class which performs the creation
    and upload. It is a `Callable` such that we can move this to a
    thread in a future version
  * Moved the actual upload logic from `EvaluatorSetupHelper` to
    `UploaderToJobFolder`

JIRA: [REEF-55](https://issues.apache.org/jira/browse/REEF-55)

Pull Request: Closes #28

Author:    Markus Weimer <we...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c8db8f8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c8db8f8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c8db8f8f

Branch: refs/heads/master
Commit: c8db8f8ff72ae5e285938f40e1a6ff6feb0866a6
Parents: 13cf522
Author: Markus Weimer <we...@apache.org>
Authored: Mon Jan 12 14:42:28 2015 +0900
Committer: Brian Cho <ch...@apache.org>
Committed: Mon Jan 12 14:45:25 2015 +0900

----------------------------------------------------------------------
 .../yarn/driver/EvaluatorSetupHelper.java       | 98 +++++---------------
 .../runtime/yarn/driver/GlobalJarUploader.java  | 92 ++++++++++++++++++
 .../yarn/driver/UploaderToJobfolder.java        | 94 +++++++++++++++++++
 3 files changed, 209 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
index 55a7855..3a2ff5c 100644
--- a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
@@ -18,16 +18,8 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
@@ -35,7 +27,6 @@ import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
@@ -58,49 +49,41 @@ final class EvaluatorSetupHelper {
 
   private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName());
 
-  private final String jobSubmissionDirectory;
-  private final Map<String, LocalResource> globalResources;
   private final REEFFileNames fileNames;
   private final ConfigurationSerializer configurationSerializer;
-  private final FileSystem fileSystem;
   private final TempFileCreator tempFileCreator;
+  private final UploaderToJobFolder uploader;
+  private final GlobalJarUploader globalJarUploader;
   private final boolean deleteTempFiles;
 
   @Inject
   EvaluatorSetupHelper(
-      final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
-      final YarnConfiguration yarnConfiguration,
       final REEFFileNames fileNames,
       final ConfigurationSerializer configurationSerializer,
       final TempFileCreator tempFileCreator,
-      final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles) throws IOException {
+      final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles,
+      final UploaderToJobFolder uploader,
+      final GlobalJarUploader globalJarUploader) throws IOException {
     this.tempFileCreator = tempFileCreator;
     this.deleteTempFiles = deleteTempFiles;
+    this.globalJarUploader = globalJarUploader;
 
-    this.fileSystem = FileSystem.get(yarnConfiguration);
-    this.jobSubmissionDirectory = jobSubmissionDirectory;
     this.fileNames = fileNames;
     this.configurationSerializer = configurationSerializer;
-    this.globalResources = this.setup();
+    this.uploader = uploader;
   }
 
-  public Map<String, LocalResource> getGlobalResources() {
-    return this.globalResources;
+  /**
+   * @return the map to be used in formulating the evaluator launch submission.
+   */
+  Map<String, LocalResource> getGlobalResources() {
+    try {
+      return this.globalJarUploader.call();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e);
+    }
   }
 
-  private Map<String, LocalResource> setup() throws IOException {
-    final Map<String, LocalResource> result = new HashMap<>(1);
-    final Path pathToGlobalJar = this.uploadToJobFolder(makeGlobalJar());
-    result.put(this.fileNames.getGlobalFolderPath(), makeLocalResourceForJarFile(pathToGlobalJar));
-    return result;
-  }
-
-  private File makeGlobalJar() throws IOException {
-    final File jarFile = new File(
-        this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
-    new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
-    return jarFile;
-  }
 
   /**
    * Sets up the LocalResources for a new Evaluator.
@@ -109,7 +92,7 @@ final class EvaluatorSetupHelper {
    * @return
    * @throws IOException
    */
-  public Map<String, LocalResource> getResources(
+  Map<String, LocalResource> getResources(
       final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
       throws IOException {
 
@@ -119,10 +102,8 @@ final class EvaluatorSetupHelper {
     final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix());
 
     // Write the configuration
-    final File configurationFile = new File(
-        localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
-    this.configurationSerializer.toFile(
-        makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
+    final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+    this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
 
     // Copy files to the staging folder
     JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
@@ -133,8 +114,8 @@ final class EvaluatorSetupHelper {
     new JARFileMaker(localFile).addChildren(localStagingFolder).close();
 
     // Upload the JAR to the job folder
-    final Path pathToEvaluatorJar = uploadToJobFolder(localFile);
-    result.put(this.fileNames.getLocalFolderPath(), makeLocalResourceForJarFile(pathToEvaluatorJar));
+    final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile);
+    result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar));
 
     if (this.deleteTempFiles) {
       LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]",
@@ -156,44 +137,11 @@ final class EvaluatorSetupHelper {
    * @throws IOException
    */
 
-  private Configuration makeEvaluatorConfiguration(
-      final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) throws IOException {
+  private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+      throws IOException {
     return Tang.Factory.getTang()
         .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
         .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
         .build();
   }
-
-  /**
-   * Uploads the given file to the job folder on (H)DFS.
-   *
-   * @param file
-   * @return
-   * @throws IOException
-   */
-  private Path uploadToJobFolder(final File file) throws IOException {
-    final Path source = new Path(file.getAbsolutePath());
-    final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
-    LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
-    this.fileSystem.copyFromLocalFile(false, true, source, destination);
-    return destination;
-  }
-
-  /**
-   * Creates a LocalResource instance for the JAR file referenced by the given Path
-   *
-   * @param path
-   * @return
-   * @throws IOException
-   */
-  private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
-    final LocalResource localResource = Records.newRecord(LocalResource.class);
-    final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
-    localResource.setType(LocalResourceType.ARCHIVE);
-    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
-    localResource.setTimestamp(status.getModificationTime());
-    localResource.setSize(status.getLen());
-    return localResource;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
new file mode 100644
index 0000000..c6ee91e
--- /dev/null
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Utility class that creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ */
+final class GlobalJarUploader implements Callable<Map<String, LocalResource>> {
+
+  /**
+   * Used for the file system constants.
+   */
+  private final REEFFileNames fileNames;
+  /**
+   * This will hold the actuall map to be used as the "global" resources when submitting Evaluators.
+   */
+  private final Map<String, LocalResource> globalResources = new HashMap<>(1);
+  /**
+   * Utility to actually perform the update.
+   */
+  private final UploaderToJobFolder uploader;
+  /**
+   * True, if globalResources contains the valid information which is cached after the first call to call().
+   */
+  private boolean isDone;
+
+  @Inject
+  GlobalJarUploader(final REEFFileNames fileNames,
+                    final UploaderToJobFolder uploader) {
+    this.fileNames = fileNames;
+    this.uploader = uploader;
+  }
+
+  /**
+   * Creates the JAR file with the global files on the driver and then uploads it to the job folder on
+   * (H)DFS.
+   *
+   * @return the map to be used as the "global" resources when submitting Evaluators.
+   * @throws IOException if the creation of the JAR or the upload fails
+   */
+  @Override
+  public synchronized Map<String, LocalResource> call() throws IOException {
+    if (!this.isDone) {
+      final Path pathToGlobalJar = this.uploader.uploadToJobFolder(makeGlobalJar());
+      globalResources.put(this.fileNames.getGlobalFolderPath(),
+          this.uploader.makeLocalResourceForJarFile(pathToGlobalJar));
+      this.isDone = true;
+    }
+    return this.globalResources;
+  }
+
+  /**
+   * Creates the JAR file for upload.
+   *
+   * @return
+   * @throws IOException
+   */
+  private File makeGlobalJar() throws IOException {
+    final File jarFile = new File(this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
+    new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
+    return jarFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
----------------------------------------------------------------------
diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
new file mode 100644
index 0000000..afc1d9c
--- /dev/null
+++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Uploads files to the current job folder.
+ */
+final class UploaderToJobFolder {
+  private static final Logger LOG = Logger.getLogger(UploaderToJobFolder.class.getName());
+
+  /**
+   * The path on (H)DFS which is used as the job's folder.
+   */
+  private final String jobSubmissionDirectory;
+  /**
+   * The FileSystem instance to use for fs operations.
+   */
+  private final FileSystem fileSystem;
+
+  @Inject
+  UploaderToJobFolder(final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
+                      final YarnConfiguration yarnConfiguration) throws IOException {
+    this.jobSubmissionDirectory = jobSubmissionDirectory;
+    this.fileSystem = FileSystem.get(yarnConfiguration);
+  }
+
+  /**
+   * Uploads the given file to the job folder on (H)DFS.
+   *
+   * @param file
+   * @return
+   * @throws java.io.IOException
+   */
+  Path uploadToJobFolder(final File file) throws IOException {
+    final Path source = new Path(file.getAbsolutePath());
+    final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
+    LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+    this.fileSystem.copyFromLocalFile(false, true, source, destination);
+    return destination;
+  }
+
+  /**
+   * Creates a LocalResource instance for the JAR file referenced by the given Path
+   *
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+    final LocalResource localResource = Records.newRecord(LocalResource.class);
+    final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
+    localResource.setType(LocalResourceType.ARCHIVE);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+    localResource.setTimestamp(status.getModificationTime());
+    localResource.setSize(status.getLen());
+    return localResource;
+  }
+}