You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/04/03 06:17:42 UTC

[1/2] GIRAPH-13: Port Giraph to YARN

Updated Branches:
  refs/heads/trunk 67f5f7475 -> b2dff2751


http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
new file mode 100644
index 0000000..d596413
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphTaskManager;
+
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * This process will execute the BSP graph tasks alloted to this YARN
+ * execution container. All tasks will be performed by calling the
+ * GraphTaskManager object. Since this GiraphYarnTask will
+ * not be passing data by key-value pairs through the MR framework, the
+ * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> {
+  static {
+    Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+  }
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
+  /** Manage the framework-agnostic Giraph task for this job run */
+  private GraphTaskManager<I, V, E, M> graphTaskManager;
+  /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
+  private final int bspTaskId;
+  /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */
+  private Context proxy;
+  /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor. Build our DUMMY MRv1 data structures to pass to our
+   * GiraphTaskManager. This allows us to continue to look the other way
+   * while Giraph relies on MRv1 under the hood.
+   * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
+   *                      supplied by GiraphApplicationMaster.
+   */
+  public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+      new GiraphConfiguration());
+    bspTaskId = taskAttemptId.getTaskID().getId();
+    conf.setInt("mapred.task.partition", bspTaskId);
+    proxy = buildProxyMapperContext(taskAttemptId);
+    graphTaskManager = new GraphTaskManager<I, V, E, M>(proxy);
+  }
+
+  /**
+   * Run one Giraph worker (or master) task, hosted in this execution container.
+   */
+  public void run() {
+    // Notify the master quicker if there is worker failure rather than
+    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+    try {
+      graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
+      graphTaskManager.execute();
+      graphTaskManager.cleanup();
+    } catch (InterruptedException ie) {
+      LOG.error("run() caught an unrecoverable InterruptedException.", ie);
+    } catch (IOException ioe) {
+      throw new RuntimeException(
+        "run() caught an unrecoverable IOException.", ioe);
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (RuntimeException e) {
+      // CHECKSTYLE: resume IllegalCatch
+      graphTaskManager.zooKeeperCleanup();
+      graphTaskManager.workerFailureCleanup();
+      throw new RuntimeException(
+        "run: Caught an unrecoverable exception " + e.getMessage(), e);
+    } finally {
+      // YARN: must complete the commit of the final output, Hadoop isn't there.
+      finalizeYarnJob();
+    }
+  }
+
+  /**
+   * Without Hadoop MR to finish the consolidation of all the task output from
+   * each HDFS task tmp dir, it won't get done. YARN has some job finalization
+   * it must do "for us." -- AND must delete "jar cache" in HDFS too!
+   */
+  private void finalizeYarnJob() {
+    if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
+      conf.getVertexOutputFormatClass() != null) {
+      try {
+        LOG.info("Master is ready to commit final job output data.");
+        VertexOutputFormat vertexOutputFormat =
+          conf.createVertexOutputFormat();
+        OutputCommitter outputCommitter =
+          vertexOutputFormat.getOutputCommitter(proxy);
+        // now we will have our output in OUTDIR if all went well...
+        outputCommitter.commitJob(proxy);
+        LOG.info("Master has committed the final job output data.");
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
+  }
+
+  /**
+   * Utility to generate dummy Mapper#Context for use in Giraph internals.
+   * This is the "key hack" to inject MapReduce-related data structures
+   * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
+   * into our Giraph BSP task code.
+   * @param tid the TaskAttemptID to construct this Mapper#Context from.
+   * @return sort of a Mapper#Context if you squint just right.
+   */
+  private Context buildProxyMapperContext(final TaskAttemptID tid) {
+    MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
+      conf, // our Configuration, populated back at the GiraphYarnClient.
+      tid,  // our TaskAttemptId, generated w/YARN app, container, attempt IDs
+      null, // RecordReader here will never be used by Giraph
+      null, // RecordWriter here will never be used by Giraph
+      null, // OutputCommitter here will never be used by Giraph
+      new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
+        @Override
+        public void setStatus(String msg) {
+          LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
+        }
+      },
+      null); // Input split setting here will never be used by Giraph
+
+    // now, we wrap our MapContext ref so we can produce a Mapper#Context
+    WrappedMapper<Object, Object, Object, Object> wrappedMapper
+      = new WrappedMapper<Object, Object, Object, Object>();
+    return wrappedMapper.getMapContext(mc);
+  }
+
+  /**
+    * Default handler for uncaught exceptions.
+    */
+  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(final Thread t, final Throwable e) {
+      LOG.fatal(
+        "uncaughtException: OverrideExceptionHandler on thread " +
+         t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Task entry point.
+   * @param args CLI arguments injected by GiraphApplicationMaster to hand off
+   *             job, task, and attempt ID's to this (and every) Giraph task.
+   *             Args should be: <code>AppId ContainerId AppAttemptId</code>
+   */
+  @SuppressWarnings("rawtypes")
+  public static void main(String[] args) {
+    if (args.length != 4) {
+      throw new IllegalStateException("GiraphYarnTask could not construct " +
+        "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
+    }
+    try {
+      GiraphYarnTask<?, ?, ?, ?> giraphYarnTask =
+        new GiraphYarnTask(getTaskAttemptID(args));
+      giraphYarnTask.run();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Throwable t) {
+      // CHECKSTYLE resume IllegalCatch
+      LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
+      System.exit(2);
+    } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
+    System.exit(0);
+  }
+
+  /**
+   * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
+   *
+   * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
+   * YARN container 1 is always AppMaster, so the least container id we will
+   * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce
+   * tasks must start at index 0. So we SUBTRACT TWO from each container id.
+   *
+   * @param args the command line args, fed to us by GiraphApplicationMaster.
+   * @return the TaskAttemptId object, populated with YARN job data.
+   */
+  private static TaskAttemptID getTaskAttemptID(String[] args) {
+    return new TaskAttemptID(
+      args[0], // YARN ApplicationId Cluster Timestamp
+      Integer.parseInt(args[1]), // YARN ApplicationId #
+      TaskID.getTaskType('m'),  // Make Giraph think this is a Mapper task.
+      Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
+      Integer.parseInt(args[3])); // YARN AppAttemptId #
+  }
+
+  /**
+   * Utility to help log command line args in the event of an error.
+   * @param args the CLI args.
+   * @return a pretty-print of the input args.
+   */
+  private static String printArgs(String[] args) {
+    int count = 0;
+    StringBuilder sb = new StringBuilder();
+    for (String arg : args) {
+      sb.append("arg[" + (count++) + "] == " + arg + ", ");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
new file mode 100644
index 0000000..aa042e8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.Sets;
+import java.io.FileOutputStream;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+/**
+ * Utilities that can only compile with versions of Hadoop that support YARN,
+ * so they live here instead of o.a.g.utils package.
+ */
+public class YarnUtils {
+  /** Class Logger */
+  private static final Logger LOG = Logger.getLogger(YarnUtils.class);
+  /** Default dir on HDFS (or equivalent) where LocalResources are stored */
+  private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
+
+  /** Private constructor, this is a utility class only */
+  private YarnUtils() { /* no-op */ }
+
+  /**
+   * Populates the LocalResources list with the HDFS paths listed in
+   * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the
+   * GiraphConfiguration for this job. Also adds the Giraph default application
+   * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.
+   * @param map the LocalResources list to populate.
+   * @param giraphConf the configuration to use to select jars to include.
+   * @param appId the ApplicationId, naming the the HDFS base dir for job jars.
+   */
+  public static void addFsResourcesToMap(Map<String, LocalResource> map,
+    GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+    boolean coreJarFound = false;
+    for (String fileName : giraphConf.getYarnLibJars().split(",")) {
+      if (fileName.length() > 0) {
+        Path filePath = new Path(baseDir, fileName);
+        LOG.info("Adding " + fileName + " to LocalResources for export.");
+        if (fileName.contains("giraph-core")) {
+          coreJarFound = true;
+        }
+        addFileToResourceMap(map, fs, filePath);
+      }
+    }
+    if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps
+      LOG.warn("Job jars (-yj option) didn't include giraph-core.");
+    }
+    Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    addFileToResourceMap(map, fs, confPath);
+  }
+
+  /**
+   * Utility function to locate local JAR files and other resources
+   * recursively in the dirs on the local CLASSPATH. Once all the files
+   * named in <code>fileNames</code> are found, we stop and return the results.
+   * @param fileNames the file name of the jars, without path information.
+   * @return a set of Paths to the jar files requested in fileNames.
+   */
+  public static Set<Path> getLocalFiles(final Set<String> fileNames) {
+    Set<Path> jarPaths = Sets.newHashSet();
+    String classPath = ".:" + System.getenv("HADOOP_HOME");
+    if (classPath.length() > 2) {
+      classPath += ":";
+    }
+    classPath += System.getenv("CLASSPATH");
+    for (String baseDir : classPath.split(":")) {
+      if (baseDir.length() > 0) {
+        // lose the globbing chars that will fail in File#listFiles
+        final int lastFileSep = baseDir.lastIndexOf("/");
+        if (lastFileSep > 0) {
+          String test = baseDir.substring(lastFileSep);
+          if (test.contains("*")) {
+            baseDir = baseDir.substring(0, lastFileSep);
+          }
+        }
+        populateJarList(new File(baseDir), jarPaths, fileNames);
+      }
+      if (jarPaths.size() >= fileNames.size()) {
+        break; // found a resource for each name in the input set, all done
+      }
+    }
+    return jarPaths;
+  }
+
+  /**
+   * Start in the working directory and recursively locate all jars.
+   * @param dir current directory to explore.
+   * @param fileSet the list to populate.
+   * @param fileNames file names to locate.
+   */
+  private static void populateJarList(final File dir,
+    final Set<Path> fileSet, final Set<String> fileNames) {
+    File[] filesInThisDir = dir.listFiles();
+    if (null == filesInThisDir) {
+      return;
+    }
+    for (File f : dir.listFiles()) {
+      if (f.isDirectory()) {
+        populateJarList(f, fileSet, fileNames);
+      } else if (f.isFile() && fileNames.contains(f.getName())) {
+        fileSet.add(new Path(f.getAbsolutePath()));
+      }
+    }
+  }
+
+  /**
+   * Boilerplate to add a file to the local resources..
+   * @param localResources the LocalResources map to populate.
+   * @param fs handle to the HDFS file system.
+   * @param target the file to send to the remote container.
+   */
+  public static void addFileToResourceMap(Map<String, LocalResource>
+    localResources, FileSystem fs, Path target)
+    throws IOException {
+    LocalResource resource = Records.newRecord(LocalResource.class);
+    FileStatus destStatus = fs.getFileStatus(target);
+    resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
+    resource.setSize(destStatus.getLen());
+    resource.setTimestamp(destStatus.getModificationTime());
+    resource.setType(LocalResourceType.FILE); // use FILE, even for jars!
+    resource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResources.put(target.getName(), resource);
+    LOG.info("Registered file in LocalResources: " + target.getName());
+  }
+
+  /**
+   * Get the base HDFS dir we will be storing our LocalResources in.
+   * @param fs the file system.
+   * @param appId the ApplicationId under which our resources will be stored.
+   * @return the path
+   */
+  public static Path getFsCachePath(final FileSystem fs,
+    final ApplicationId appId) {
+    return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
+  }
+
+  /**
+   * Popuate the environment string map to be added to the environment vars
+   * in a remote execution container. Adds the local classpath to pick up
+   * "yarn-site.xml" and "mapred-site.xml" stuff.
+   * @param env the map of env var values.
+   * @param giraphConf the GiraphConfiguration to pull values from.
+   */
+  public static void addLocalClasspathToEnv(final Map<String, String> env,
+    final GiraphConfiguration giraphConf) {
+    StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+    for (String cpEntry : giraphConf.getStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':').append(cpEntry.trim());
+    }
+    for (String cpEntry : giraphConf.getStrings(
+      MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+      MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':').append(cpEntry.trim());
+    }
+    // add the runtime classpath needed for tests to work
+    if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':').append(System.getenv("CLASSPATH"));
+    }
+    env.put("CLASSPATH", classPathEnv.toString());
+  }
+
+  /**
+   * Populate the LocalResources list with the GiraphConf XML file's HDFS path.
+   * @param giraphConf the GiraphConfifuration to export for worker tasks.
+   * @param appId the ApplicationId for this YARN app.
+   * @param localResourceMap the LocalResource map of files to export to tasks.
+   */
+  public static void addGiraphConfToLocalResourceMap(GiraphConfiguration
+    giraphConf, ApplicationId appId, Map<String, LocalResource>
+    localResourceMap) throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+      GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
+  }
+
+  /**
+   * Export our populated GiraphConfiguration as an XML file to be used by the
+   * ApplicationMaster's exec container, and register it with LocalResources.
+   * @param giraphConf the current Configuration object to be published.
+   * @param appId the ApplicationId to stamp this app's base HDFS resources dir.
+   */
+  public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
+    ApplicationId appId) throws IOException {
+    File confFile = new File(System.getProperty("java.io.tmpdir"),
+      GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    if (confFile.exists()) {
+      confFile.delete();
+    }
+    String localConfPath = confFile.getAbsolutePath();
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(localConfPath);
+      giraphConf.writeXml(fos);
+      FileSystem fs = FileSystem.get(giraphConf);
+      Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+        GiraphConstants.GIRAPH_YARN_CONF_FILE);
+      fos.flush();
+      fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
+    } finally {
+      if (null != fos) {
+        fos.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
new file mode 100644
index 0000000..c8a3683
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Catch all package for YARN-specific code. Only compiles under Maven
+ * hadoop_yarn profile.
+ */
+package org.apache.giraph.yarn;

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
new file mode 100644
index 0000000..efd1179
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.junit.Test;
+
+
+/**
+ * Tests the Giraph on YARN workflow. Basically, the plan is to use a
+ * <code>MiniYARNCluster</code> to run a small test job through our
+ * GiraphYarnClient -> GiraphApplicationMaster -> GiraphYarnTask (2 no-ops)
+ * No "real" BSP code need be tested here, as it is not aware it is running on
+ * YARN once the job is in progress, so the existing MRv1 BSP tests are fine.
+ */
+public class TestYarnJob implements Watcher {
+  private static final Logger LOG = Logger.getLogger(TestYarnJob.class);
+  /**
+   * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
+   */
+  private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable,
+      NullWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+      voteToHalt();
+    }
+  }
+
+  /** job name for this integration test */
+  private static final String JOB_NAME = "giraph-TestPureYarnJob";
+  /** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */
+  private static final int LOCAL_ZOOKEEPER_PORT = 22183;
+  /** ZooKeeper list system property */
+  private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
+  /** Local ZK working dir, avoid InternalVertexRunner naming */
+  private static final String zkDirName = "_bspZooKeeperYarn";
+  /** Local ZK Manager working dir, avoid InternalVertexRunner naming */
+  private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn";
+
+  /** Temp ZK base working dir for integration test */
+  private File testBaseDir = null;
+  /** Fake input dir for integration test */
+  private File inputDir = null;
+  /** Fake output dir for integration test */
+  private File outputDir = null;
+  /** Temp ZK working dir for integration test */
+  private File zkDir = null;
+  /** Temp ZK Manager working dir for integration test */
+  private File zkMgrDir = null;
+  /** Internal ZooKeeper instance for integration test run */
+  private InternalZooKeeper zookeeper;
+  /** For running the ZK instance locally */
+  private ExecutorService exec = Executors.newSingleThreadExecutor();
+  /** GiraphConfiguration for a "fake YARN job" */
+  private GiraphConfiguration conf = null;
+  /** Counter for # of znode events during integration test */
+  private int zkEventCount = 0;
+  /** Our YARN test cluster for local integration test */
+  private MiniYARNCluster cluster = null;
+
+  @Test
+  public void testPureYarnJob() {
+    try {
+      setupYarnConfiguration();
+      initLocalZookeeper();
+      initYarnCluster();
+      GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
+      Assert.assertTrue(testGyc.run(true));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail("Caught exception in TestYarnJob: " + e);
+    } finally {
+      zookeeper.end();
+      exec.shutdown();
+      cluster.stop();
+      deleteTempDirectories();
+    }
+  }
+
+  /**
+   * Logging this stuff will help you debug integration test issues.
+   * @param zkEvent incoming event for our current test ZK's znode tree.
+   */
+  @Override
+  public void process(WatchedEvent zkEvent) {
+    String event = zkEvent == null ? "NULL" : zkEvent.toString();
+    LOG.info("TestYarnJob observed ZK event: " + event +
+      " for a total of " + (++zkEventCount) + " so far.");
+  }
+
+  /**
+   * Delete our temp dir so checkstyle and rat plugins are happy.
+   */
+  private void deleteTempDirectories() {
+    try {
+      if (testBaseDir != null && testBaseDir.exists()) {
+        FileUtils.deleteDirectory(testBaseDir);
+      }
+    } catch (IOException ioe) {
+      LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
+    }
+  }
+
+  /**
+   * Initialize a local ZK instance for our test run.
+   */
+  private void initLocalZookeeper() throws IOException {
+    zookeeper = new InternalZooKeeper();
+    exec.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Configure a local zookeeper instance
+          Properties zkProperties = generateLocalZkProperties();
+          QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+          qpConfig.parseProperties(zkProperties);
+          // run the zookeeper instance
+          final ServerConfig zkConfig = new ServerConfig();
+          zkConfig.readFrom(qpConfig);
+          zookeeper.runFromConfig(zkConfig);
+        } catch (QuorumPeerConfig.ConfigException qpcce) {
+          throw new RuntimeException("parse of generated ZK config file " +
+                                       "has failed.", qpcce);
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
+        }
+      }
+
+      /**
+       * Returns pre-created ZK conf properties for Giraph integration test.
+       * @return the populated properties sheet.
+       */
+      Properties generateLocalZkProperties() {
+        Properties zkProperties = new Properties();
+        zkProperties.setProperty("tickTime", "2000");
+        zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+        zkProperties.setProperty("clientPort",
+                                  String.valueOf(LOCAL_ZOOKEEPER_PORT));
+        zkProperties.setProperty("maxClientCnxns", "10000");
+        zkProperties.setProperty("minSessionTimeout", "10000");
+        zkProperties.setProperty("maxSessionTimeout", "100000");
+        zkProperties.setProperty("initLimit", "10");
+        zkProperties.setProperty("syncLimit", "5");
+        zkProperties.setProperty("snapCount", "50000");
+        return zkProperties;
+      }
+    });
+  }
+
+  /**
+   * Set up the GiraphConfiguration settings we need to run a no-op Giraph
+   * job on a MiniYARNCluster as an integration test. Some YARN-specific
+   * flags are set inside GiraphYarnClient and won't need to be set here.
+   */
+  private void setupYarnConfiguration() throws IOException {
+    conf = new GiraphConfiguration();
+    conf.setWorkerConfiguration(1, 1, 100.0f);
+    conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
+    conf.setEventWaitMsecs(3 * 1000);
+    conf.setYarnLibJars(""); // no need
+    conf.setYarnTaskHeapMb(256); // small since no work to be done
+    conf.setVertexClass(DummyYarnVertex.class);
+    conf.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    conf.setNumComputeThreads(1);
+    conf.setMaxTaskAttempts(1);
+    conf.setNumInputSplitsThreads(1);
+    // Giraph on YARN only ever things its running in "non-local" mode
+    conf.setLocalTestMode(false);
+    // this has to happen here before we populate the conf with the temp dirs
+    setupTempDirectories();
+    conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
+    GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath()));
+    // hand off the ZK info we just created to our no-op job
+    GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
+    conf.setZooKeeperConfiguration(zkList);
+    conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
+    GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath());
+    // without this, our "real" client won't connect w/"fake" YARN cluster
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+  }
+
+  /**
+   * Initialize the temp dir tree for ZK and I/O for no-op integration test.
+   */
+  private void setupTempDirectories() throws IOException {
+    try {
+    testBaseDir =
+      new File(System.getProperty("user.dir"), JOB_NAME);
+    if (testBaseDir.exists()) {
+      testBaseDir.delete();
+    }
+    testBaseDir.mkdir();
+    inputDir = new File(testBaseDir, "yarninput");
+    if (inputDir.exists()) {
+      inputDir.delete();
+    }
+    inputDir.mkdir();
+    File inFile = new File(inputDir, "graph_data.txt");
+    inFile.createNewFile();
+    outputDir = new File(testBaseDir, "yarnoutput");
+    if (outputDir.exists()) {
+      outputDir.delete();
+    } // don't actually produce the output dir, let Giraph On YARN do it
+    zkDir = new File(testBaseDir, zkDirName);
+    if (zkDir.exists()) {
+      zkDir.delete();
+    }
+    zkDir.mkdir();
+    zkMgrDir = new File(testBaseDir, zkMgrDirName);
+    if (zkMgrDir.exists()) {
+      zkMgrDir.delete();
+    }
+    zkMgrDir.mkdir();
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+      throw new IOException("from setupTempDirectories: ", ioe);
+    }
+  }
+
+  /**
+   * Initialize the MiniYARNCluster for the integration test.
+   */
+  private void initYarnCluster() {
+    cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
+    cluster.init(new ImmutableClassesGiraphConfiguration(conf));
+    cluster.start();
+  }
+
+  /**
+   * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+   */
+  class InternalZooKeeper extends ZooKeeperServerMain {
+    /**
+     * Shutdown the ZooKeeper instance.
+     */
+    void end() {
+      shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/resources/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/resources/capacity-scheduler.xml b/giraph-core/src/test/resources/capacity-scheduler.xml
new file mode 100644
index 0000000..5e19b9a
--- /dev/null
+++ b/giraph-core/src/test/resources/capacity-scheduler.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<configuration>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.queues</name>
+    <value>unfunded,default</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.capacity</name>
+    <value>100</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.unfunded.capacity</name>
+    <value>50</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.default.capacity</name>
+    <value>50</value>
+  </property>
+
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 3b6a08c..21e8ccf 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -104,6 +104,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -116,6 +128,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -128,6 +152,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -152,6 +188,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -180,6 +228,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -191,24 +251,119 @@ under the License.
       </build>
     </profile>
 
+    <!-- Currently supports hadoop-2.0.2-alpha
+      (see hadoop_yarn profile in giraph-parent POM to change) -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <build>
+        <plugins>
+          <plugin>
+              <groupId>org.sonatype.plugins</groupId>
+              <artifactId>munge-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- The profiles below do not (yet) include any munge flags -->
     <profile>
       <id>hadoop_2.0.0</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.1</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.2</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
-        <id>hadoop_2.0.3</id>
+      <id>hadoop_2.0.3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_trunk</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 446aa2a..c883d41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,7 +496,7 @@ under the License.
                 <exclude>CODE_CONVENTIONS</exclude>
                 <!-- generated content -->
                 <exclude>**/target/**</exclude>
-                <exclude>_bsp/**</exclude>
+                <exclude>/_bsp/**</exclude>
                 <exclude>.checkstyle</exclude>
                 <!-- source control and IDEs -->
                 <exclude>.reviewboardrc</exclude>
@@ -506,7 +506,9 @@ under the License.
                 <exclude>.idea/**</exclude>
                 <exclude>**/*.iml</exclude>
                 <exclude>**/*.ipr</exclude>
-             </excludes>
+                <!-- test resources (for Giraph on YARN profile) -->
+                <exclude>**/test/resources/**</exclude>
+              </excludes>
           </configuration>
         </plugin>
         <plugin>
@@ -749,6 +751,60 @@ under the License.
       </dependencies>
     </profile>
 
+    <!-- This profile runs on Hadoop-2.0.3-alpha by default, but does not
+      use Hadoop MapReduce v2 to set up the Giraph job. This means the Giraph
+      worker/master tasks are not Mappers. Tasks are run in YARN-managed execution
+      containers. Internally, the Giraph framework continues to depend on many Hadoop
+      MapReduce classes to perform work. -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <properties>
+        <hadoop.version>2.0.3-alpha</hadoop.version>
+        <munge.symbols>PURE_YARN</munge.symbols>
+      </properties>
+      <dependencies>
+        <!-- sorted lexicographically -->
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+	      <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+	      <groupId>org.apache.hadoop</groupId>
+	      <artifactId>hadoop-yarn-server-common</artifactId>
+	      <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+	      <groupId>org.apache.hadoop</groupId>
+	      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+	      <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+	      <groupId>org.apache.hadoop</groupId>
+	      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+	      <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+	      <groupId>org.apache.hadoop</groupId>
+	      <artifactId>hadoop-yarn-server-tests</artifactId>
+	      <version>${hadoop.version}</version>
+          <type>test-jar</type>
+        </dependency>
+      </dependencies>
+    </profile>
+
     <!-- Help keep future Hadoop versions munge-free:
          All profiles below are munge-free: avoid introducing any munge
          flags on any of the following profiles. -->
@@ -917,7 +973,7 @@ under the License.
       <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
-        <version>1.3.2</version>
+        <version>2.1</version>
       </dependency>
       <dependency>
         <groupId>commons-cli</groupId>


[2/2] git commit: updated refs/heads/trunk to b2dff27

Posted by er...@apache.org.
GIRAPH-13: Port Giraph to YARN


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b2dff275
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b2dff275
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b2dff275

Branch: refs/heads/trunk
Commit: b2dff2751d8d3d768f788b39089688c18f6c1750
Parents: 67f5f74
Author: Eli Reisman <er...@apache.org>
Authored: Wed Apr 3 00:15:56 2013 -0400
Committer: Eli Reisman <er...@apache.org>
Committed: Wed Apr 3 00:15:56 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 checkstyle.xml                                     |    5 +-
 giraph-core/pom.xml                                |  159 ++++-
 .../main/java/org/apache/giraph/GiraphRunner.java  |   15 +-
 .../java/org/apache/giraph/bsp/BspInputFormat.java |    5 +-
 .../apache/giraph/conf/GiraphConfiguration.java    |   65 ++
 .../org/apache/giraph/conf/GiraphConstants.java    |   17 +-
 .../org/apache/giraph/graph/GraphTaskManager.java  |   36 +-
 .../org/apache/giraph/master/BspServiceMaster.java |   24 +-
 .../apache/giraph/utils/ConfigurationUtils.java    |   42 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |   32 +-
 .../giraph/yarn/GiraphApplicationMaster.java       |  699 +++++++++++++++
 .../org/apache/giraph/yarn/GiraphYarnClient.java   |  476 ++++++++++
 .../org/apache/giraph/yarn/GiraphYarnTask.java     |  240 +++++
 .../java/org/apache/giraph/yarn/YarnUtils.java     |  241 +++++
 .../java/org/apache/giraph/yarn/package-info.java  |   22 +
 .../java/org/apache/giraph/yarn/TestYarnJob.java   |  285 ++++++
 .../src/test/resources/capacity-scheduler.xml      |   26 +
 giraph-examples/pom.xml                            |  157 ++++-
 pom.xml                                            |   62 ++-
 20 files changed, 2578 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e0a82a6..69261d2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-13: Port Giraph to YARN (ereisman)
+
   GIRAPH-600: Create an option to do output during computation (majakabiljo)
 
   GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle.xml b/checkstyle.xml
index 370c120..66fd1ad 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -104,7 +104,10 @@
       <!-- Switch statements should be complete and with independent cases -->
     <module name="FallThrough" />
     <module name="MissingSwitchDefault" />
-    <module name="RedundantThrows"/>
+    <!-- For hadoop_yarn profile, some YARN exceptions aren't loading in checkstyle -->
+    <module name="RedundantThrows">
+        <property name="suppressLoadErrors" value="true" />
+    </module>
     <module name="SimplifyBooleanExpression"/>
     <module name="SimplifyBooleanReturn"/>
       <!-- Only one statment per line allowed -->

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 3580d0c..2f473ed 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -136,6 +136,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -148,6 +160,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -160,6 +184,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -184,6 +220,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -212,6 +260,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -223,24 +283,121 @@ under the License.
       </build>
     </profile>
 
+    <!-- Currently supports hadoop-2.0.3-alpha
+      (see hadoop_yarn profile in giraph-parent POM to change) -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <build>
+        <plugins>
+          <plugin>
+              <groupId>org.sonatype.plugins</groupId>
+              <artifactId>munge-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- Unmunged profiles are below. -->
+
     <profile>
       <id>hadoop_2.0.0</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.1</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+
     </profile>
 
     <profile>
-      <id>hadoop_2.0.2</id>
+        <id>hadoop_2.0.2</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
  
     <profile>
       <id>hadoop_trunk</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 5bd5686..1bd79b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -21,6 +21,9 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.giraph.utils.ConfigurationUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.job.GiraphJob;
+/*if[PURE_YARN]
+import org.apache.giraph.yarn.GiraphYarnClient;
+end[PURE_YARN]*/
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
@@ -64,16 +67,26 @@ public class GiraphRunner implements Tool {
    * @return job run exit code
    */
   public int run(String[] args) throws Exception {
+    if (null == getConf()) { // for YARN profile
+      conf = new Configuration();
+    }
     GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
     CommandLine cmd = ConfigurationUtils.parseArgs(giraphConf, args);
     if (null == cmd) {
       return 0; // user requested help/info printout, don't run a job.
     }
 
+    // set up job for various platforms
     final String vertexClassName = args[0];
-    GiraphJob job = new GiraphJob(giraphConf, "Giraph: " + vertexClassName);
+    final String jobName = "Giraph: " + vertexClassName;
+    /*if[PURE_YARN]
+    GiraphYarnClient job = new GiraphYarnClient(giraphConf, jobName);
+    else[PURE_YARN]*/
+    GiraphJob job = new GiraphJob(giraphConf, jobName);
     prepareHadoopMRJob(job, cmd);
+    /*end[PURE_YARN]*/
 
+    // run the job, collect results
     if (LOG.isDebugEnabled()) {
       LOG.debug("Attempting to run Vertex: " + vertexClassName);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index cc53271..8f88c80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -52,13 +52,16 @@ public class BspInputFormat extends InputFormat<Text, Text> {
     int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
     boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
     int maxTasks = maxWorkers;
-    if (splitMasterWorker) {
+    // if this is a YARN job, separate ZK should already be running
+    boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
+    if (splitMasterWorker && !isYarnJob) {
       int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
       maxTasks += zkServers;
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
           ", split master/worker = " + splitMasterWorker +
+          ", is YARN-only job = " + isYarnJob +
           ", total max tasks = " + maxTasks);
     }
     return maxTasks;

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 040c26f..8a78313 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -434,6 +434,10 @@ public class GiraphConfiguration extends Configuration
     set(ZOOKEEPER_LIST, serverList);
   }
 
+  /**
+   * Getter for SPLIT_MASTER_WORKER flag.
+   * @return boolean flag value.
+   */
   public final boolean getSplitMasterWorker() {
     return SPLIT_MASTER_WORKER.get(this);
   }
@@ -475,6 +479,50 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
+   * actually managing our cluster nodes, i.e. each task is a Mapper.
+   * @return TRUE if this is a pure YARN job.
+   */
+  public boolean isPureYarnJob() {
+    return IS_PURE_YARN_JOB.get(this);
+  }
+
+  /**
+   * Jars required in "Pure YARN" jobs (names only, no paths) should
+   * be listed here in full, including Giraph framework jar(s).
+   * @return the comma-separated list of jar names for export to cluster.
+   */
+  public String getYarnLibJars() {
+    return GIRAPH_YARN_LIBJARS.get(this);
+  }
+
+  /**
+   * Populate jar list for Pure YARN jobs.
+   * @param jarList a comma-separated list of jar names
+   */
+  public void setYarnLibJars(String jarList) {
+    GIRAPH_YARN_LIBJARS.set(this, jarList);
+  }
+
+  /**
+   * Get heap size (in MB) for each task in our Giraph job run,
+   * assuming this job will run on the "pure YARN" profile.
+   * @return the heap size for all tasks, in MB
+   */
+  public int getYarnTaskHeapMb() {
+    return GIRAPH_YARN_TASK_HEAP_MB.get(this);
+  }
+
+  /**
+   * Set heap size for Giraph tasks in our job run, assuming
+   * the job will run on the "pure YARN" profile.
+   * @param heapMb the heap size for all tasks
+   */
+  public void setYarnTaskHeapMb(int heapMb) {
+    GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
+  }
+
+  /**
    * Get the ZooKeeper list.
    *
    * @return ZooKeeper list of strings, comma separated or null if none set.
@@ -496,10 +544,27 @@ public class GiraphConfiguration extends Configuration
     return LOG_THREAD_LAYOUT.get(this);
   }
 
+  /**
+   * is this job run a local test?
+   * @return the test status as recorded in the Configuration
+   */
   public boolean getLocalTestMode() {
     return LOCAL_TEST_MODE.get(this);
   }
 
+  /**
+   * Flag this job as a local test run.
+   * @param flag the test status for this job
+   */
+  public void setLocalTestMode(boolean flag) {
+    LOCAL_TEST_MODE.set(this, flag);
+  }
+
+  /**
+   * The number of server tasks in our ZK quorum for
+   * this job run.
+   * @return the number of ZK servers in the quorum
+   */
   public int getZooKeeperServerCount() {
     return ZOOKEEPER_SERVER_COUNT.get(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index eaa8363..730fa5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -141,8 +141,21 @@ public interface GiraphConstants {
   BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
       new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
 
-  /** Output Format Path (for Giraph-on-YARN) */
-  String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
+  /** conf key for comma-separated list of jars to export to YARN workers */
+  StrConfOption GIRAPH_YARN_LIBJARS =
+    new StrConfOption("giraph.yarn.libjars", "");
+  /** Name of the XML file that will export our Configuration to YARN workers */
+  String GIRAPH_YARN_CONF_FILE = "giraph-conf.xml";
+  /** Giraph default heap size for all tasks when running on YARN profile */
+  int GIRAPH_YARN_TASK_HEAP_MB_DEFAULT = 1024;
+  /** Name of Giraph property for user-configurable heap memory per worker */
+  IntConfOption GIRAPH_YARN_TASK_HEAP_MB = new IntConfOption(
+    "giraph.yarn.task.heap.mb", GIRAPH_YARN_TASK_HEAP_MB_DEFAULT);
+  /** Default priority level in YARN for our task containers */
+  int GIRAPH_YARN_PRIORITY = 10;
+  /** Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks) */
+  BooleanConfOption IS_PURE_YARN_JOB =
+    new BooleanConfOption("giraph.pure.yarn.job", false);
 
   /** Vertex index class */
   ClassConfOption<WritableComparable> VERTEX_ID_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3ae5ed3..8ed44e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -94,10 +94,11 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
 public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   E extends Writable, M extends Writable> implements
   ResetSuperstepMetricsObserver {
-  static {
+  /*if_not[PURE_YARN]
+  static { // Eliminate this? Even MRv1 tasks should not need it here.
     Configuration.addDefaultResource("giraph-site.xml");
   }
-
+  end[PURE_YARN]*/
   /** Name of metric for superstep time in msec */
   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
   /** Name of metric for compute on all vertices in msec */
@@ -157,6 +158,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private String serverPortList;
   /** The Hadoop Mapper#Context for this job */
   private Mapper<?, ?, ?, ?>.Context context;
+  /** is this GraphTaskManager the master? */
+  private boolean isMaster;
 
   /**
    * Default constructor for GiraphTaskManager.
@@ -165,6 +168,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    */
   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
+    this.isMaster = false;
   }
 
   /**
@@ -174,9 +178,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   public void setup(Path[] zkPathList)
     throws IOException, InterruptedException {
     context.setStatus("setup: Beginning worker setup.");
-    determineClassTypes(context.getConfiguration());
     conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
       context.getConfiguration());
+    determineClassTypes(conf);
     // configure global logging level for Giraph job
     initializeAndConfigureLogging();
     // init the metrics objects
@@ -302,6 +306,23 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Sets the "isMaster" flag for final output commit to happen on master.
+   * @param im the boolean input to set isMaster. Applies to "pure YARN only"
+   */
+  public void setIsMaster(final boolean im) {
+    this.isMaster = im;
+  }
+
+  /**
+   * Get "isMaster" status flag -- we need to know if we're the master in the
+   * "finally" block of our GiraphYarnTask#execute() to commit final job output.
+   * @return true if this task IS the master.
+   */
+  public boolean isMaster() {
+    return isMaster;
+  }
+
+  /**
    * Produce a reference to the "start" superstep timer for the current
    * superstep.
    * @param superstep the current superstep count
@@ -455,7 +476,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 
   /**
    * Copied from JobConf to get the location of this jar.  Workaround for
-   * things like Oozie map-reduce jobs.
+   * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
+   * make use of this, as the jars are unpacked at each container site.
    *
    * @param myClass Class to search the class loader path for to locate
    *        the relevant jar file
@@ -574,7 +596,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private void locateZookeeperClasspath(Path[] fileClassPaths)
     throws IOException {
     if (!conf.getLocalTestMode()) {
-      //Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
       String zkClasspath = null;
       if (fileClassPaths == null) {
         if (LOG.isInfoEnabled()) {
@@ -584,7 +605,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
         if (jarFile == null) {
           jarFile = findContainingJar(getClass());
         }
-        zkClasspath = jarFile.replaceFirst("file:", "");
+        // Pure YARN profiles will use unpacked resources, so calls
+        // to "findContainingJar()" in that context can return NULL!
+        zkClasspath = null == jarFile ?
+          "./*" : jarFile.replaceFirst("file:", "");
       } else {
         StringBuilder sb = new StringBuilder();
         sb.append(fileClassPaths[0]);

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 404e47e..9f4bcbf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -344,15 +344,21 @@ public class BspServiceMaster<I extends WritableComparable,
     LOG.fatal("failJob: Killing job " + getJobId());
     LOG.fatal("failJob: exception " + e.toString());
     try {
-      @SuppressWarnings("deprecation")
-      org.apache.hadoop.mapred.JobClient jobClient =
+      if (getConfiguration().isPureYarnJob()) {
+        throw new RuntimeException(
+          "BspServiceMaster (YARN profile) is " +
+          "FAILING this task, throwing exception to end job run.", e);
+      } else {
+        @SuppressWarnings("deprecation")
+        org.apache.hadoop.mapred.JobClient jobClient =
           new org.apache.hadoop.mapred.JobClient(
-              (org.apache.hadoop.mapred.JobConf)
-              getContext().getConfiguration());
-      @SuppressWarnings("deprecation")
-      JobID jobId = JobID.forName(getJobId());
-      RunningJob job = jobClient.getJob(jobId);
-      job.killJob();
+            (org.apache.hadoop.mapred.JobConf)
+            getContext().getConfiguration());
+        @SuppressWarnings("deprecation")
+        JobID jobId = JobID.forName(getJobId());
+        RunningJob job = jobClient.getJob(jobId);
+        job.killJob();
+      }
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     } finally {
@@ -1737,6 +1743,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
 
     if (isMaster) {
+      getGraphTaskManager().setIsMaster(true);
       cleanUpZooKeeper();
       // If desired, cleanup the checkpoint directory
       if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
@@ -1750,7 +1757,6 @@ public class BspServiceMaster<I extends WritableComparable,
         }
       }
       aggregatorHandler.close();
-
       masterClient.closeConnections();
       masterServer.close();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index bd30455..9ebe693 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.utils;
 
+/*if[PURE_YARN]
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+end[PURE_YARN]*/
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
@@ -41,6 +44,8 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -57,16 +62,33 @@ public final class ConfigurationUtils {
   private static final Logger LOG =
     Logger.getLogger(ConfigurationUtils.class);
   /** The base path for output dirs as saved in GiraphConfiguration */
-  private static final Path BASE_OUTPUT_DIR =
-    new Path("hdfs://user/" + System.getenv("USER"));
+  private static final Path BASE_OUTPUT_PATH;
+  static {
+    // whether local or remote, if there's no *-site.xml's to find, we're done
+    try {
+      BASE_OUTPUT_PATH = FileSystem.get(new Configuration()).getHomeDirectory();
+    } catch (IOException ioe) {
+      throw new IllegalStateException("Error locating default base path!", ioe);
+    }
+  }
   /** Maintains our accepted options in case the caller wants to add some */
   private static Options OPTIONS;
+  /*if_not[PURE_YARN]
+  private static String OUTDIR = ""; // no-op placeholder for YARN
+  end[PURE_YARN]*/
+
   static {
     OPTIONS = new Options();
     OPTIONS.addOption("h", "help", false, "Help");
     OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
         "algorithms");
     OPTIONS.addOption("q", "quiet", false, "Quiet output");
+    OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
+      "filenames to distribute to Giraph tasks and ApplicationMaster. " +
+      "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
+    OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
+      "Giraph task (YARN only.) Defaults to " +
+      GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
     OPTIONS.addOption("w", "workers", true, "Number of workers");
     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
@@ -304,11 +326,20 @@ public final class ConfigurationUtils {
         }
       }
     }
+    // YARN-ONLY OPTIONS
+    if (cmd.hasOption("yj")) {
+      giraphConfiguration.setYarnLibJars(cmd.getOptionValue("yj"));
+    }
+    if (cmd.hasOption("yh")) {
+      giraphConfiguration.setYarnTaskHeapMb(
+        Integer.parseInt(cmd.getOptionValue("yh")));
+    }
     if (cmd.hasOption("of")) {
       if (cmd.hasOption("op")) {
-        Path outputDir = new Path(BASE_OUTPUT_DIR, cmd.getOptionValue("op"));
-        giraphConfiguration.set(
-          GiraphConstants.GIRAPH_OUTPUT_DIR, outputDir.toString());
+        Path outputDir = new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
+        outputDir = // for YARN conf to get the out dir we need w/o a Job obj
+          outputDir.getFileSystem(giraphConfiguration).makeQualified(outputDir);
+        giraphConfiguration.set(OUTDIR, outputDir.toString());
       } else {
         if (LOG.isInfoEnabled()) {
           LOG.info("No output path specified. Ensure your OutputFormat " +
@@ -316,6 +347,7 @@ public final class ConfigurationUtils {
         }
       }
     }
+    // END YARN-ONLY OPTIONS
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 35db999..2ea91b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -978,9 +979,30 @@ else[HADOOP_NON_SECURE]*/
       getContext().progress();
       ++partitionIndex;
     }
-    vertexWriter.close(getContext());
+    vertexWriter.close(getContext()); // the temp results are saved now
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "saveVertices: Done saving vertices");
+      "saveVertices: Done saving vertices.");
+    // YARN: must complete the commit the "task" output, Hadoop isn't there.
+    if (getConfiguration().isPureYarnJob() &&
+      getConfiguration().getVertexOutputFormatClass() != null) {
+      try {
+        OutputCommitter outputCommitter =
+          vertexOutputFormat.getOutputCommitter(getContext());
+        if (outputCommitter.needsTaskCommit(getContext())) {
+          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+            "OutputCommitter: committing task output.");
+          // transfer from temp dirs to "task commit" dirs to prep for
+          // the master's OutputCommitter#commitJob(context) call to finish.
+          outputCommitter.commitTask(getContext());
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
   }
 
   @Override
@@ -1403,6 +1425,12 @@ else[HADOOP_NON_SECURE]*/
             "to see if it needs to restart");
       }
       JSONObject jsonObj = getJobState();
+      // in YARN, we have to manually commit our own output in 2 stages that we
+      // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
+      if (getConfiguration().isPureYarnJob() && null == jsonObj) {
+        LOG.error("BspServiceWorker#getJobState() came back NULL.");
+        return false; // the event has been processed.
+      }
       try {
         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
             ApplicationState.START_SUPERSTEP) &&

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
new file mode 100644
index 0000000..c2b88a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
+import java.security.PrivilegedAction;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+  .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+  .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The YARN Application Master for Giraph is launched when the GiraphYarnClient
+ * successfully requests an execution container from the Resource Manager. The
+ * Application Master is provided by Giraph to manage all requests for resources
+ * (worker nodes, memory, jar files, job configuration metadata, etc.) that
+ * Giraph will need to perform the job. When Giraph runs in a non-YARN context,
+ * the role of the Application Master is played by Hadoop when it launches our
+ * GraphMappers (worker/master task nodes) to run the job.
+ */
+public class GiraphApplicationMaster {
+  /** Logger */
+  private static final Logger LOG =
+    Logger.getLogger(GiraphApplicationMaster.class);
+  /** Exit code for YARN containers that were manually killed/aborted */
+  private static final int YARN_ABORT_EXIT_STATUS = -100;
+  /** Exit code for successfully run YARN containers */
+  private static final int YARN_SUCCESS_EXIT_STATUS = 0;
+  /** millis to sleep between heartbeats during long loops */
+  private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
+  /** A reusable map of resources already in HDFS for each task to copy-to-local
+   * env and use to launch each GiraphYarnTask. */
+  private static Map<String, LocalResource> LOCAL_RESOURCES;
+  /** Initialize the Configuration class with the resource file exported by
+   * the YarnClient. We will need to export this resource to the tasks also.
+   * Construct the HEARTBEAT to use to ping the RM about job progress/health.
+   */
+  static {
+    // pick up new conf XML file and populate it with stuff exported from client
+    Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+  }
+
+  /** Handle to AppMaster's RPC connection to YARN and the RM. */
+  private final AMRMProtocol resourceManager;
+  /** bootstrap handle to YARN RPC service */
+  private final YarnRPC rpc;
+  /** GiraphApplicationMaster's application attempt id */
+  private final ApplicationAttemptId appAttemptId;
+  /** GiraphApplicationMaster container id. Leave me here, I'm very useful */
+  private final ContainerId containerId;
+  /** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */
+  private final int containersToLaunch;
+  /** MB of JVM heap per Giraph task container */
+  private final int heapPerContainer;
+  /** Giraph configuration for this job, transported here by YARN framework */
+  private final ImmutableClassesGiraphConfiguration giraphConf;
+  /** Completed Containers Counter */
+  private final AtomicInteger completedCount;
+  /** Failed Containers Counter */
+  private final AtomicInteger failedCount;
+  /** Number of containers requested (hopefully '-w' from our conf) */
+  private final AtomicInteger allocatedCount;
+  /** Number of successfully completed containers in this job run. */
+  private final AtomicInteger successfulCount;
+  /** the ACK #'s for AllocateRequests + heartbeats == last response # */
+  private AtomicInteger lastResponseId;
+  /** Executor to attempt asynchronous launches of Giraph containers */
+  private ExecutorService executor;
+  /** YARN progress is a <code>float</code> between 0.0f and 1.0f */
+  private float progress;
+  /** An empty resource request with which to send heartbeats + progress */
+  private AllocateRequest heartbeat;
+
+  /**
+   * Construct the GiraphAppMaster, populate fields using env vars
+   * set up by YARN framework in this execution container.
+   * @param cId the ContainerId
+   * @param aId the ApplicationAttemptId
+   */
+  protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
+    throws IOException {
+    containerId = cId; // future good stuff will need me to operate.
+    appAttemptId = aId;
+    progress = 0.0f;
+    lastResponseId = new AtomicInteger(0);
+    giraphConf =
+      new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
+    completedCount = new AtomicInteger(0);
+    failedCount = new AtomicInteger(0);
+    allocatedCount = new AtomicInteger(0);
+    successfulCount = new AtomicInteger(0);
+    rpc = YarnRPC.create(giraphConf);
+    resourceManager = getHandleToRm();
+    containersToLaunch = giraphConf.getMaxWorkers() + 1;
+    executor = Executors.newFixedThreadPool(containersToLaunch);
+    heapPerContainer = giraphConf.getYarnTaskHeapMb();
+  }
+
+  /**
+   * Coordinates all requests for Giraph's worker/master task containers, and
+   * manages application liveness heartbeat, completion status, teardown, etc.
+   */
+  private void run() {
+    // register Application Master with the YARN Resource Manager so we can
+    // begin requesting resources. The response contains useful cluster info.
+    try {
+      resourceManager.registerApplicationMaster(getRegisterAppMasterRequest());
+    } catch (IOException ioe) {
+      throw new IllegalStateException(
+        "GiraphApplicationMaster failed to register with RM.", ioe);
+    }
+
+    try {
+      // make the request only ONCE; only request more on container failure etc.
+      AMResponse amResponse = sendAllocationRequest();
+      logClusterResources(amResponse);
+      // loop here, waiting for TOTAL # REQUESTED containers to be available
+      // and launch them piecemeal they are reported to us in heartbeat pings.
+      launchContainersAsynchronously(amResponse);
+      // wait for the containers to finish & tally success/fails
+      awaitJobCompletion(); // all launched tasks are done before complete call
+    } finally {
+      // if we get here w/o problems, the executor is already long finished.
+      if (null != executor && !executor.isTerminated()) {
+        executor.shutdownNow(); // force kill, especially if got here by throw
+      }
+      // When the application completes, it should send a "finish request" to RM
+      try {
+        resourceManager.finishApplicationMaster(buildFinishAppMasterRequest());
+      } catch (YarnRemoteException yre) {
+        LOG.error("GiraphApplicationMaster failed to un-register with RM", yre);
+      }
+      if (null != rpc) {
+        rpc.stopProxy(resourceManager, giraphConf);
+      }
+    }
+  }
+
+  /**
+   * Reports the cluster resources in the AM response to our initial ask.
+   * @param amResponse the AM response from YARN.
+   */
+  private void logClusterResources(final AMResponse amResponse) {
+    // Check what the current available resources in the cluster are
+    Resource availableResources = amResponse.getAvailableResources();
+    LOG.info("Initial Giraph resource request for " + containersToLaunch +
+      " containers has been submitted. " +
+      "The RM reports cluster headroom is: " + availableResources);
+  }
+
+  /**
+   * Utility to build the final "job run is finished" request to the RM.
+   * @return the finish app master request, to send to the RM.
+   */
+  private FinishApplicationMasterRequest buildFinishAppMasterRequest() {
+    LOG.info("Application completed. Signalling finish to RM");
+    FinishApplicationMasterRequest finishRequest =
+      Records.newRecord(FinishApplicationMasterRequest.class);
+    finishRequest.setAppAttemptId(appAttemptId);
+    FinalApplicationStatus appStatus;
+    String appMessage = "Container Diagnostics: " +
+      " allocated=" + allocatedCount.get() +
+      ", completed=" + completedCount.get() +
+      ", succeeded=" + successfulCount.get() +
+      ", failed=" + failedCount.get();
+    if (successfulCount.get() == containersToLaunch) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+    }
+    finishRequest.setDiagnostics(appMessage);
+    finishRequest.setFinishApplicationStatus(appStatus);
+    return finishRequest;
+  }
+
+  /**
+   * Loop and check the status of the containers until all are finished,
+   * logging how each container meets its end: success, error, or abort.
+   */
+  private void awaitJobCompletion() {
+    List<ContainerStatus> completedContainers;
+    do {
+      try {
+        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+      } catch (InterruptedException ignored) {
+        final int notFinished = containersToLaunch - completedCount.get();
+        LOG.info("GiraphApplicationMaster interrupted from sleep while " +
+          " waiting for " + notFinished + "containers to finish job.");
+      }
+      updateProgress();
+      completedContainers =
+          sendHeartbeat().getAMResponse().getCompletedContainersStatuses();
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID= " +
+          containerStatus.getContainerId() +
+          ", state=" + containerStatus.getState() +
+          ", exitStatus=" + containerStatus.getExitStatus() +
+          ", diagnostics=" + containerStatus.getDiagnostics());
+        switch (containerStatus.getExitStatus()) {
+        case YARN_SUCCESS_EXIT_STATUS:
+          successfulCount.incrementAndGet();
+          break;
+        case YARN_ABORT_EXIT_STATUS:
+          break; // not success or fail
+        default:
+          failedCount.incrementAndGet();
+          break;
+        }
+        completedCount.incrementAndGet();
+      } // end completion check loop
+    } while (completedCount.get() < containersToLaunch);
+  }
+
+  /** Update the progress value for our next heartbeat (allocate request) */
+  private void updateProgress() {
+    // set progress to "half done + ratio of completed containers so far"
+    final float ratio = completedCount.get() / (float) containersToLaunch;
+    progress = 0.5f + ratio / 2.0f;
+  }
+
+  /**
+   * Loop while checking container request status, adding each new bundle of
+   * containers allocated to our executor to launch (run Giraph BSP task) the
+   * job on each. Giraph's full resource request was sent ONCE, but these
+   * containers will become available in groups, over a period of time.
+   * @param amResponse metadata about our AllocateRequest's results.
+   */
+  private void launchContainersAsynchronously(AMResponse amResponse) {
+    List<Container> allocatedContainers;
+    do {
+      // get fresh report on # alloc'd containers, sleep between checks
+      if (null == amResponse) {
+        amResponse = sendHeartbeat().getAMResponse();
+      }
+      allocatedContainers = amResponse.getAllocatedContainers();
+      allocatedCount.addAndGet(allocatedContainers.size());
+      LOG.info("Waiting for task containers: " + allocatedCount.get() +
+        " allocated out of " + containersToLaunch + " required.");
+      startContainerLaunchingThreads(allocatedContainers);
+      amResponse = null;
+      try {
+        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+      } catch (InterruptedException ignored) {
+        LOG.info("launchContainerAsynchronously() raised InterruptedException");
+      }
+    } while (containersToLaunch > allocatedCount.get());
+  }
+
+  /**
+   * For each container successfully allocated, attempt to set up and launch
+   * a Giraph worker/master task.
+   * @param allocatedContainers the containers we have currently allocated.
+   */
+  private void startContainerLaunchingThreads(final List<Container>
+    allocatedContainers) {
+    progress = allocatedCount.get() / (2.0f * containersToLaunch);
+    int placeholder = 0;
+    for (Container allocatedContainer : allocatedContainers) {
+      LOG.info("Launching shell command on a new container." +
+        ", containerId=" + allocatedContainer.getId() +
+        ", containerNode=" + allocatedContainer.getNodeId().getHost() +
+        ":" + allocatedContainer.getNodeId().getPort() +
+        ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
+        ", containerState=" + allocatedContainer.getState() +
+        ", containerResourceMemory=" +
+        allocatedContainer.getResource().getMemory());
+      // Launch and start the container on a separate thread to keep the main
+      // thread unblocked as all containers may not be allocated at one go.
+      LaunchContainerRunnable launchThread =
+        new LaunchContainerRunnable(allocatedContainer, heapPerContainer);
+      executor.execute(launchThread);
+    }
+  }
+
+  /**
+   * Sends heartbeat messages that include progress amounts. These are in the
+   * form of a YARN AllocateRequest object that asks for 0 resources.
+   * @return the AllocateResponse, which we may or may not need.
+   */
+  private AllocateResponse sendHeartbeat() {
+    heartbeat.setProgress(progress);
+    heartbeat.setResponseId(lastResponseId.incrementAndGet());
+    AllocateResponse allocateResponse = null;
+    try {
+      allocateResponse = resourceManager.allocate(heartbeat);
+      final int responseId = allocateResponse.getAMResponse().getResponseId();
+      if (responseId != lastResponseId.get()) {
+        lastResponseId.set(responseId);
+      }
+      checkForRebootFlag(allocateResponse.getAMResponse());
+      return allocateResponse;
+    } catch (YarnRemoteException yre) {
+      throw new IllegalStateException("sendHeartbeat() failed with " +
+        "YarnRemoteException: ", yre);
+    }
+  }
+
+  /**
+   * Compose and send the allocation request for our Giraph BSP worker/master
+   * compute nodes. Right now the requested containers are identical, mirroring
+   * Giraph's behavior when running on Hadoop MRv1. Giraph could use YARN
+   * to set fine-grained capability to each container, including host choice.
+   * @return The AM resource descriptor with our container allocations.
+   */
+  private AMResponse sendAllocationRequest() {
+    AllocateRequest allocRequest = Records.newRecord(AllocateRequest.class);
+    try {
+      List<ResourceRequest> containerList = buildResourceRequests();
+      allocRequest.addAllAsks(containerList);
+      List<ContainerId> releasedContainers = Lists.newArrayListWithCapacity(0);
+      allocRequest.setResponseId(lastResponseId.get());
+      allocRequest.setApplicationAttemptId(appAttemptId);
+      allocRequest.addAllReleases(releasedContainers);
+      allocRequest.setProgress(progress);
+      AllocateResponse allocResponse = resourceManager.allocate(allocRequest);
+      AMResponse amResponse = allocResponse.getAMResponse();
+      if (amResponse.getResponseId() != lastResponseId.get()) {
+        lastResponseId.set(amResponse.getResponseId());
+      }
+      checkForRebootFlag(amResponse);
+      // now, make THIS our new HEARTBEAT object, but with ZERO new requests!
+      initHeartbeatRequestObject(allocRequest);
+      return amResponse;
+    } catch (YarnRemoteException yre) {
+      throw new IllegalStateException("Giraph Application Master could not " +
+        "successfully allocate the specified containers from the RM.", yre);
+    }
+  }
+
+  /**
+   * If the YARN RM gets way out of sync with our App Master, its time to
+   * fail the job/restart. This should trigger the job end and cleanup.
+   * @param amResponse RPC response from YARN RM to check for reboot flag.
+   */
+  private void checkForRebootFlag(AMResponse amResponse) {
+    if (amResponse.getReboot()) {
+      LOG.error("AMResponse: " + amResponse + " raised YARN REBOOT FLAG!");
+      throw new RuntimeException("AMResponse " + amResponse +
+        " signaled GiraphApplicationMaster with REBOOT FLAG. Failing job.");
+    }
+  }
+
+
+  /**
+   * Reuses the initial container request (switched to "0 asks" so no new allocs
+   * occur) and sends all heartbeats using that request object.
+   * @param allocRequest the allocation request object to use as heartbeat.
+   */
+  private void initHeartbeatRequestObject(AllocateRequest allocRequest) {
+    allocRequest.clearAsks();
+    allocRequest.addAllAsks(Lists.<ResourceRequest>newArrayListWithCapacity(0));
+    heartbeat = allocRequest;
+  }
+
+  /**
+   * Utility to construct the ResourceRequest for our resource ask: all the
+   * Giraph containers we need, and their memory/priority requirements.
+   * @return a list of ResourceRequests to send (just one, for Giraph tasks)
+   */
+  private List<ResourceRequest> buildResourceRequests() {
+    // set up resource request for our Giraph BSP application
+    ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+    resourceRequest.setHostName("*"); // hand pick our worker locality someday
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
+    resourceRequest.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setVirtualCores(1); // new YARN API, won't work version < 2.0.3
+    capability.setMemory(heapPerContainer);
+    resourceRequest.setCapability(capability);
+    resourceRequest.setNumContainers(containersToLaunch);
+    return ImmutableList.of(resourceRequest);
+  }
+
+  /**
+   * Obtain handle to RPC connection to Resource Manager.
+   * @return the AMRMProtocol handle to YARN RPC.
+   */
+  private AMRMProtocol getHandleToRm() {
+    YarnConfiguration yarnConf = new YarnConfiguration(giraphConf);
+    final InetSocketAddress rmAddress = yarnConf.getSocketAddr(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      UserGroupInformation currentUser;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new IllegalStateException("Could not obtain UGI for user.", ioe);
+      }
+      String tokenURLEncodedStr = System.getenv(
+        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException ioe) {
+        throw new IllegalStateException("Could not decode token from URL", ioe);
+      }
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is: " + token);
+      }
+      currentUser.addToken(token);
+      return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+        @Override
+        public AMRMProtocol run() {
+          return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            rmAddress, giraphConf);
+        }
+      });
+    } else { // non-secure
+      return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+        rmAddress, yarnConf);
+    }
+  }
+
+  /**
+   * Get the request to register this Application Master with the RM.
+   * @return the populated AM request.
+   */
+  private RegisterApplicationMasterRequest getRegisterAppMasterRequest() {
+    RegisterApplicationMasterRequest appMasterRequest =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    appMasterRequest.setApplicationAttemptId(appAttemptId);
+    try {
+      appMasterRequest.setHost(InetAddress.getLocalHost().getHostName());
+    } catch (UnknownHostException uhe) {
+      throw new IllegalStateException(
+        "Cannot resolve GiraphApplicationMaster's local hostname.", uhe);
+    }
+    // useful for a Giraph WebUI or whatever: play with these
+    // appMasterRequest.setRpcPort(appMasterRpcPort);
+    // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+    return appMasterRequest;
+  }
+
+  /**
+   * Lazily compose the map of jar and file names to LocalResource records for
+   * inclusion in GiraphYarnTask container requests. Can re-use the same map
+   * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
+   * @return the resource map for a ContainerLaunchContext
+   */
+  private Map<String, LocalResource> getTaskResourceMap() {
+    // Set the local resources: just send the copies already in HDFS
+    if (null == LOCAL_RESOURCES) {
+      LOCAL_RESOURCES = Maps.newHashMap();
+      try {
+        // if you have to update the giraphConf for export to tasks, do it now
+        updateGiraphConfForExport();
+        YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
+          appAttemptId.getApplicationId());
+      } catch (IOException ioe) {
+        // fail fast, this container will never launch.
+        throw new IllegalStateException("Could not configure the container" +
+          "launch context for GiraphYarnTasks.", ioe);
+      }
+    }
+    // else, return the prepopulated copy to reuse for each GiraphYarkTask
+    return LOCAL_RESOURCES;
+  }
+
+  /**
+   * If you're going to make ANY CHANGES to your local GiraphConfiguration
+   * while running the GiraphApplicationMaster, put them here.
+   * This method replaces the current XML file GiraphConfiguration
+   * stored in HDFS with the copy you have modified locally in-memory.
+   */
+  private void updateGiraphConfForExport()
+    throws IOException {
+    // Giraph expects this MapReduce stuff
+    giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+      appAttemptId.getAttemptId());
+    // now republish the giraph-conf.xml in HDFS
+    YarnUtils.exportGiraphConfiguration(giraphConf,
+      appAttemptId.getApplicationId());
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManager} and launch the container
+   * that will house one of our Giraph worker (or master) tasks.
+   */
+  private class LaunchContainerRunnable implements Runnable {
+    /** Allocated container */
+    private Container container;
+    /** Handle to communicate with ContainerManager */
+    private ContainerManager containerManager;
+    /** Heap memory in MB to allocate for this JVM in the launched container */
+    private final int heapSize;
+
+    /**
+     * Constructor.
+     * @param newGiraphTaskContainer Allocated container
+     * @param heapMb the <code>-Xmx</code> setting for each launched task.
+     */
+    public LaunchContainerRunnable(final Container newGiraphTaskContainer,
+      final int heapMb) {
+      this.container = newGiraphTaskContainer;
+      this.heapSize = heapMb;
+    }
+
+    /**
+     * Helper function to connect to ContainerManager, which resides on the
+     * same compute node as this Giraph task's container. The CM starts tasks.
+     */
+    private void connectToCM() {
+      LOG.debug("Connecting to CM for containerid=" + container.getId());
+      String cmIpPortStr = container.getNodeId().getHost() + ":" +
+        container.getNodeId().getPort();
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+      LOG.info("Connecting to CM at " + cmIpPortStr);
+      this.containerManager = (ContainerManager)
+        rpc.getProxy(ContainerManager.class, cmAddress, giraphConf);
+    }
+
+    /**
+     * Connects to CM, sets up container launch context
+     * for shell command and eventually dispatches the container
+     * start request to the CM.
+     */
+    public void run() {
+      // Connect to ContainerManager
+      connectToCM();
+      // configure the launcher for the Giraph task it will host
+      StartContainerRequest startReq =
+        Records.newRecord(StartContainerRequest.class);
+      startReq.setContainerLaunchContext(buildContainerLaunchContext());
+      // request CM to start this container as spec'd in ContainerLaunchContext
+      try {
+        containerManager.startContainer(startReq);
+      } catch (YarnRemoteException yre) {
+        LOG.error("StartContainerRequest failed for containerId=" +
+                    container.getId(), yre);
+      }
+    }
+
+    /**
+     * Boilerplate to set up the ContainerLaunchContext to tell the Container
+     * Manager how to launch our Giraph task in the execution container we have
+     * already allocated.
+     * @return a populated ContainerLaunchContext object.
+     */
+    private ContainerLaunchContext buildContainerLaunchContext() {
+      LOG.info("Setting up container launch container for containerid=" +
+        container.getId());
+      ContainerLaunchContext launchContext = Records
+        .newRecord(ContainerLaunchContext.class);
+      launchContext.setContainerId(container.getId());
+      launchContext.setResource(container.getResource());
+      // args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
+      final List<String> commands = generateShellExecCommand();
+      launchContext.setCommands(commands);
+      // add user information to the job
+      String jobUserName = "ERROR_UNKNOWN_USER";
+      UserGroupInformation ugi = null;
+      try {
+        ugi = UserGroupInformation.getCurrentUser();
+        jobUserName = ugi.getUserName();
+      } catch (IOException ioe) {
+        jobUserName =
+          System.getenv(ApplicationConstants.Environment.USER.name());
+      }
+      launchContext.setUser(jobUserName);
+      LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
+      // Set the environment variables to inject into remote task's container
+      buildEnvironment(launchContext);
+      // Set the local resources: just send the copies already in HDFS
+      launchContext.setLocalResources(getTaskResourceMap());
+      return launchContext;
+    }
+
+    /**
+     * Generates our command line string used to launch our Giraph tasks.
+     * @return the BASH shell commands to launch the job.
+     */
+    private List<String> generateShellExecCommand() {
+      return ImmutableList.of("java " +
+        "-Xmx" + heapSize + "M " +
+        "-Xms" + heapSize + "M " +
+        "-cp .:${CLASSPATH} " +
+        "org.apache.giraph.yarn.GiraphYarnTask " +
+        appAttemptId.getApplicationId().getClusterTimestamp() + " " +
+        appAttemptId.getApplicationId().getId() + " " +
+        container.getId().getId() + " " +
+        appAttemptId.getAttemptId() + " " +
+        "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        "/task-" + container.getId().getId() + "-stdout.log " +
+        "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        "/task-" + container.getId().getId() + "-stderr.log "
+      );
+    }
+
+    /**
+     * Utility to populate the environment vars we wish to inject into the new
+     * containter's env when the Giraph BSP task is executed.
+     * @param launchContext the launch context which will set our environment
+     *                      vars in the app master's execution container.
+     */
+    private void buildEnvironment(final ContainerLaunchContext launchContext) {
+      Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
+      // pick up the local classpath so when we instantiate a Configuration
+      // remotely, we also get the "mapred-site.xml" and "yarn-site.xml"
+      YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
+      // set this map of env vars into the launch context.
+      launchContext.setEnvironment(classPathForEnv);
+    }
+  }
+
+  /**
+   * Application entry point
+   * @param args command-line args (set by GiraphYarnClient, if any)
+   */
+  public static void main(final String[] args) {
+    String containerIdString =
+        System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    if (containerIdString == null) {
+      // container id should always be set in the env by the framework
+      throw new IllegalArgumentException("ContainerId not found in env vars.");
+    }
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+    ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
+    try {
+      GiraphApplicationMaster giraphAppMaster =
+        new GiraphApplicationMaster(containerId, appAttemptId);
+      giraphAppMaster.run();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Throwable t) {
+      // CHECKSTYLE: resume IllegalCatch
+      LOG.error("GiraphApplicationMaster caught a " +
+                  "top-level exception in main.", t);
+      System.exit(2);
+    }
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
new file mode 100644
index 0000000..341db0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The initial launcher for a YARN-based Giraph job. This class attempts to
+ * configure and send a request to the ResourceManager for a single
+ * application container to host GiraphApplicationMaster. The RPC connection
+ * between the RM and GiraphYarnClient is the YARN ApplicationManager.
+ */
+public class GiraphYarnClient extends YarnClientImpl {
+  static {
+    Configuration.addDefaultResource("giraph-site.xml");
+  }
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphYarnClient.class);
+  /** Sleep time between silent progress checks */
+  private static final int JOB_STATUS_INTERVAL_MSECS = 800;
+  /** Memory (in MB) to allocate for our ApplicationMaster container */
+  private static final int YARN_APP_MASTER_MEMORY_MB = 1024;
+
+  /** human-readable job name */
+  private final String jobName;
+  /** Helper configuration from the job */
+  private final GiraphConfiguration giraphConf;
+  /** ApplicationId object (needed for RPC to ResourceManager) */
+  private ApplicationId appId;
+  /** # of sleeps between progress reports to client */
+  private int reportCounter;
+
+  /**
+   * Constructor. Requires caller to hand us a GiraphConfiguration.
+   *
+   * @param giraphConf User-defined configuration
+   * @param jobName User-defined job name
+   */
+  public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
+    throws IOException {
+    super();
+    this.reportCounter = 0;
+    this.jobName = jobName;
+    this.appId = null; // can't set this until after start()
+    this.giraphConf = giraphConf;
+    verifyOutputDirDoesNotExist();
+    super.init(this.giraphConf);
+  }
+
+  /**
+   * Submit a request to the Hadoop YARN cluster's ResourceManager
+   * to obtain an application container. This will run our ApplicationMaster,
+   * which will in turn request app containers for Giraphs' master and all
+   * worker tasks.
+   * @param verbose Not implemented yet, to provide compatibility w/GiraphJob
+   * @return true if job is successful
+   */
+  public boolean run(final boolean verbose) {
+    checkJobLocalZooKeeperSupported();
+    // init our connection to YARN ResourceManager RPC
+    start();
+    // request an application id from the RM
+    GetNewApplicationResponse getNewAppResponse;
+    try {
+      getNewAppResponse = super.getNewApplication();
+      // make sure we have the cluster resources to run the job.
+      checkPerNodeResourcesAvailable(getNewAppResponse);
+    } catch (YarnRemoteException yre) {
+      yre.printStackTrace();
+      return false;
+    }
+    appId = getNewAppResponse.getApplicationId();
+    LOG.info("Obtained new Application ID: " + appId);
+    // sanity check
+    applyConfigsForYarnGiraphJob();
+    // configure our request for an exec container for GiraphApplicationMaster
+    ApplicationSubmissionContext appContext = createAppSubmissionContext();
+    ContainerLaunchContext containerContext = buildContainerLaunchContext();
+    appContext.setAMContainerSpec(containerContext);
+    LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
+      "launch container is populated.");
+    // make the request, blow up if fail, loop and report job progress if not
+    try {
+      // obtain an "updated copy" of the appId for status checks/job kill later
+      appId = super.submitApplication(appContext);
+    } catch (YarnRemoteException yre) {
+      throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
+    }
+    LOG.info("GiraphApplicationMaster container request was submitted to " +
+      "ResourceManager for job: " + jobName);
+    return awaitGiraphJobCompletion();
+  }
+
+  /**
+   * Without Hadoop MR to check for us, make sure the output dir doesn't exist!
+   */
+  private void verifyOutputDirDoesNotExist() {
+    Path outDir = null;
+    try {
+      FileSystem fs = FileSystem.get(giraphConf);
+      String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
+      outDir =
+        new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
+      FileStatus outStatus = fs.getFileStatus(outDir);
+      if (outStatus.isDirectory() || outStatus.isFile() ||
+        outStatus.isSymlink()) {
+        throw new IllegalStateException("Path " + outDir + " already exists.");
+      }
+    } catch (IOException ioe) {
+      LOG.info("Final output path is: " + outDir);
+    }
+  }
+
+  /**
+   * Configuration settings we need to customize for a Giraph on YARN
+   * job. We need to call this EARLY in the job, before the GiraphConfiguration
+   * is exported to HDFS for localization in each task container.
+   */
+  private void applyConfigsForYarnGiraphJob() {
+    GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
+    GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
+    giraphConf.set("mapred.job.id", "giraph_yarn_" + appId); // ZK app base path
+  }
+
+  /**
+   * Utility to make sure we have the cluster resources we need to run this
+   * job. If they are not available, we should die here before too much setup.
+   * @param cluster the GetNewApplicationResponse from the YARN RM.
+   */
+  private void checkPerNodeResourcesAvailable(
+    final GetNewApplicationResponse cluster) {
+    // are there enough containers to go around for our Giraph job?
+    List<NodeReport> nodes = null;
+    int numContainers = 0;
+    long totalAvailable = 0;
+    try {
+      nodes = super.getNodeReports();
+    } catch (YarnRemoteException yre) {
+      throw new RuntimeException("GiraphYarnClient could not connect with " +
+        "the YARN ResourceManager to determine the number of available " +
+        "application containers.", yre);
+    }
+    for (NodeReport node : nodes) {
+      numContainers += node.getNumContainers();
+      totalAvailable += node.getCapability().getMemory();
+    }
+    // 1 master + all workers in -w command line arg
+    final int workers = giraphConf.getMaxWorkers() + 1;
+    if (workers < numContainers) {
+      throw new RuntimeException("Giraph job requires " + workers +
+        " containers to run; cluster only hosts " + numContainers);
+    }
+    checkAndAdjustPerTaskHeapSize(cluster);
+    final long totalAsk =
+      giraphConf.getYarnTaskHeapMb() * workers;
+    if (totalAsk > totalAvailable) {
+      throw new IllegalStateException("Giraph's estimated cluster heap " +
+        totalAsk + "MB ask is greater than the current available cluster " +
+        "heap of " + totalAvailable + "MB. Aborting Job.");
+    }
+  }
+
+  /**
+   * Adjust the user-supplied <code>-yh</code> and <code>-w</code>
+   * settings if they are too small or large for the current cluster,
+   * and re-record the new settings in the GiraphConfiguration for export.
+   * @param gnar the GetNewAppResponse from the YARN ResourceManager.
+   */
+  private void checkAndAdjustPerTaskHeapSize(GetNewApplicationResponse gnar) {
+    // do we have the right heap size on these cluster nodes to run our job?
+    final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
+    final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
+    // make sure heap size is OK for this cluster's available containers
+    int giraphMem = giraphConf.getYarnTaskHeapMb();
+    if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
+      LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
+    }
+    if (giraphMem > maxCapacity) {
+      LOG.info("Giraph's request of heap MB per-task is more than the " +
+        "minimum; downgrading Giraph to" + maxCapacity + "MB.");
+      giraphMem = maxCapacity;
+    }
+    if (giraphMem < minCapacity) {
+      LOG.info("Giraph's request of heap MB per-task is less than the " +
+        "minimum; upgrading Giraph to " + minCapacity + "MB.");
+      giraphMem = minCapacity;
+    }
+    giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
+  }
+
+  /**
+   * Kill time for the client, report progress occasionally, and otherwise
+   * just sleep and wait for the job to finish. If no AM response, kill the app.
+   * @return true if job run is successful.
+   */
+  private boolean awaitGiraphJobCompletion() {
+    boolean done;
+    ApplicationReport report = null;
+    try {
+      do {
+        try {
+          Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
+        } catch (InterruptedException ir) {
+          LOG.info("Progress reporter's sleep was interrupted!", ir);
+        }
+        report = super.getApplicationReport(appId);
+        done = checkProgress(report);
+      } while (!done);
+      if (!giraphConf.metricsEnabled()) {
+        cleanupJarCache();
+      }
+    } catch (IOException ex) {
+      final String diagnostics = (null == report) ? "" :
+        "Diagnostics: " + report.getDiagnostics();
+      LOG.error("Fatal fault encountered, failing " + jobName + ". " +
+        diagnostics, ex);
+      try {
+        LOG.error("FORCIBLY KILLING Application from AppMaster.");
+        super.killApplication(appId);
+      } catch (YarnRemoteException yre) {
+        LOG.error("Exception raised in attempt to kill application.", yre);
+      }
+      return false;
+    }
+    return printFinalJobReport();
+  }
+
+  /**
+   * Deletes the HDFS cache in YARN, which replaces DistributedCache of Hadoop.
+   * If metrics are enabled this will not get called (so you can examine cache.)
+   * @throws IOException if bad things happen.
+   */
+  private void cleanupJarCache() throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
+    if (fs.exists(baseCacheDir)) {
+      LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
+      fs.delete(baseCacheDir, true); // stuff inside
+      fs.delete(baseCacheDir, false); // dir itself
+    }
+  }
+
+  /**
+   * Print final formatted job report for local client that initiated this run.
+   * @return true for app success, false for failure.
+   */
+  private boolean printFinalJobReport() {
+    ApplicationReport report;
+    try {
+      report = super.getApplicationReport(appId);
+      FinalApplicationStatus finalAppStatus =
+        report.getFinalApplicationStatus();
+      final long secs =
+        (report.getFinishTime() - report.getStartTime()) / 1000L;
+      final String time = String.format("%d minutes, %d seconds.",
+        secs / 60L, secs % 60L);
+      LOG.info("Completed " + jobName + ": " +
+        finalAppStatus.name() + ", total running time: " + time);
+    } catch (YarnRemoteException yre) {
+      LOG.error("Exception encountered while attempting to request " +
+        "a final job report for " + jobName , yre);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Compose the ContainerLaunchContext for the Application Master.
+   * @return the CLC object populated and configured.
+   */
+  private ContainerLaunchContext buildContainerLaunchContext() {
+    ContainerLaunchContext appMasterContainer =
+      Records.newRecord(ContainerLaunchContext.class);
+    appMasterContainer.setEnvironment(buildEnvironment());
+    appMasterContainer.setLocalResources(buildLocalResourceMap());
+    appMasterContainer.setCommands(buildAppMasterExecCommand());
+    appMasterContainer.setResource(buildContainerMemory());
+    appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
+    return appMasterContainer;
+  }
+
+  /**
+   * Assess whether job is already finished/failed and 'done' flag needs to be
+   * set, prints progress display for client if all is going well.
+   * @param report the application report to assess.
+   * @return true if job report indicates the job run is over.
+   */
+  private boolean checkProgress(final ApplicationReport report) {
+    YarnApplicationState jobState = report.getYarnApplicationState();
+    if (jobState == YarnApplicationState.FINISHED ||
+      jobState == YarnApplicationState.KILLED) {
+      return true;
+    } else if (jobState == YarnApplicationState.FAILED) {
+      LOG.error(jobName + " reports FAILED state, diagnostics show: " +
+        report.getDiagnostics());
+      return true;
+    } else {
+      if (reportCounter++ % 5 == 0) {
+        displayJobReport(report);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Display a formatted summary of the job progress report from the AM.
+   * @param report the report to display.
+   */
+  private void displayJobReport(final ApplicationReport report) {
+    if (null == report) {
+      throw new IllegalStateException("[*] Latest ApplicationReport for job " +
+        jobName + " was not received by the local client.");
+    }
+    final float elapsed =
+      (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
+    LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
+    LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
+      report.getYarnApplicationState().name() + ", Containers used: " +
+      report.getApplicationResourceUsageReport().getNumUsedContainers());
+  }
+
+  /**
+   * Utility to produce the command line to activate the AM from the shell.
+   * @return A <code>List<String></code> of shell commands to execute in
+   *         the container allocated to us by the RM to host our App Master.
+   */
+  private List<String> buildAppMasterExecCommand() {
+    // 'gam-' prefix is for GiraphApplicationMaster in log file names
+    return ImmutableList.of("${JAVA_HOME}/bin/java " +
+      "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
+      "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!
+      "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
+      "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
+      "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
+    );
+  }
+
+  /**
+   * Check if the job's configuration is for a local run. These can all be
+   * removed as we expand the functionality of the "pure YARN" Giraph profile.
+   */
+  private void checkJobLocalZooKeeperSupported() {
+    final String checkZkList = giraphConf.getZookeeperList();
+    if (checkZkList == null || checkZkList.isEmpty()) {
+      throw new IllegalArgumentException("Giraph on YARN does not currently" +
+        "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" +
+        checkZkList + "'");
+    }
+  }
+
+  /**
+   * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS
+   * in the LocalResources map, copy to HDFS on that same registered path.
+   * @param map the LocalResources list to populate.
+   */
+  private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
+    throws IOException {
+    Set<String> jars = Sets.newHashSet();
+    String[] libJars = giraphConf.getYarnLibJars().split(",");
+    for (String libJar : libJars) {
+      jars.add(libJar);
+    }
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+    for (Path jar : YarnUtils.getLocalFiles(jars)) {
+      LOG.info("Located local resource for export at: " + jar);
+      Path dest = new Path(baseDir, jar.getName());
+      fs.copyFromLocalFile(false, true, jar, dest);
+      YarnUtils.addFileToResourceMap(map, fs, dest);
+    }
+  }
+
+  /**
+   * Construct the memory requirements for the AppMaster's container request.
+   * @return A Resource that wraps the memory request.
+   */
+  private Resource buildContainerMemory() {
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(YARN_APP_MASTER_MEMORY_MB);
+    return capability;
+  }
+
+  /**
+   * Create the mapping of environment vars that will be visible to the
+   * ApplicationMaster in its remote app container.
+   * @return a map of environment vars to set up for the AppMaster.
+   */
+  private Map<String, String> buildEnvironment() {
+    Map<String, String> environment =
+      Maps.<String, String>newHashMap();
+    YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
+    // TODO: add java.class.path to env map if running a local YARN minicluster.
+    return environment;
+  }
+
+  /**
+   * Create the mapping of files and JARs to send to the GiraphApplicationMaster
+   * and from there on to the Giraph tasks.
+   * @return the map of jars to local resource paths for transport
+   *         to the host container that will run our AppMaster.
+   */
+  private Map<String, LocalResource> buildLocalResourceMap() {
+    Map<String, LocalResource> localResources =
+        Maps.<String, LocalResource>newHashMap();
+    try {
+      // export the GiraphConfiguration to HDFS for localization to remote tasks
+      YarnUtils.exportGiraphConfiguration(giraphConf, appId);
+      YarnUtils.addGiraphConfToLocalResourceMap(
+        giraphConf, appId, localResources);
+      // add jars from '-yj' cmd-line arg to resource map for localization
+      addLocalJarsToResourceMap(localResources);
+      return localResources;
+    } catch (IOException ioe) {
+      throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
+    }
+  }
+
+  /**
+   * Create the app submission context, and populate it.
+   * @return the populated ApplicationSubmissionContext for the AppMaster.
+   */
+  private ApplicationSubmissionContext createAppSubmissionContext() {
+    ApplicationSubmissionContext appContext =
+      Records.newRecord(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(appId);
+    appContext.setApplicationName(jobName);
+    return appContext;
+  }
+}