You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/04/03 06:17:43 UTC

[2/2] git commit: updated refs/heads/trunk to b2dff27

GIRAPH-13: Port Giraph to YARN


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b2dff275
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b2dff275
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b2dff275

Branch: refs/heads/trunk
Commit: b2dff2751d8d3d768f788b39089688c18f6c1750
Parents: 67f5f74
Author: Eli Reisman <er...@apache.org>
Authored: Wed Apr 3 00:15:56 2013 -0400
Committer: Eli Reisman <er...@apache.org>
Committed: Wed Apr 3 00:15:56 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 checkstyle.xml                                     |    5 +-
 giraph-core/pom.xml                                |  159 ++++-
 .../main/java/org/apache/giraph/GiraphRunner.java  |   15 +-
 .../java/org/apache/giraph/bsp/BspInputFormat.java |    5 +-
 .../apache/giraph/conf/GiraphConfiguration.java    |   65 ++
 .../org/apache/giraph/conf/GiraphConstants.java    |   17 +-
 .../org/apache/giraph/graph/GraphTaskManager.java  |   36 +-
 .../org/apache/giraph/master/BspServiceMaster.java |   24 +-
 .../apache/giraph/utils/ConfigurationUtils.java    |   42 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |   32 +-
 .../giraph/yarn/GiraphApplicationMaster.java       |  699 +++++++++++++++
 .../org/apache/giraph/yarn/GiraphYarnClient.java   |  476 ++++++++++
 .../org/apache/giraph/yarn/GiraphYarnTask.java     |  240 +++++
 .../java/org/apache/giraph/yarn/YarnUtils.java     |  241 +++++
 .../java/org/apache/giraph/yarn/package-info.java  |   22 +
 .../java/org/apache/giraph/yarn/TestYarnJob.java   |  285 ++++++
 .../src/test/resources/capacity-scheduler.xml      |   26 +
 giraph-examples/pom.xml                            |  157 ++++-
 pom.xml                                            |   62 ++-
 20 files changed, 2578 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e0a82a6..69261d2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-13: Port Giraph to YARN (ereisman)
+
   GIRAPH-600: Create an option to do output during computation (majakabiljo)
 
   GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle.xml b/checkstyle.xml
index 370c120..66fd1ad 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -104,7 +104,10 @@
       <!-- Switch statements should be complete and with independent cases -->
     <module name="FallThrough" />
     <module name="MissingSwitchDefault" />
-    <module name="RedundantThrows"/>
+    <!-- For hadoop_yarn profile, some YARN exceptions aren't loading in checkstyle -->
+    <module name="RedundantThrows">
+        <property name="suppressLoadErrors" value="true" />
+    </module>
     <module name="SimplifyBooleanExpression"/>
     <module name="SimplifyBooleanReturn"/>
       <!-- Only one statment per line allowed -->

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 3580d0c..2f473ed 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -136,6 +136,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -148,6 +160,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -160,6 +184,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -184,6 +220,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -212,6 +260,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -223,24 +283,121 @@ under the License.
       </build>
     </profile>
 
+    <!-- Currently supports hadoop-2.0.3-alpha
+      (see hadoop_yarn profile in giraph-parent POM to change) -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <build>
+        <plugins>
+          <plugin>
+              <groupId>org.sonatype.plugins</groupId>
+              <artifactId>munge-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- Unmunged profiles are below. -->
+
     <profile>
       <id>hadoop_2.0.0</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.1</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+
     </profile>
 
     <profile>
-      <id>hadoop_2.0.2</id>
+        <id>hadoop_2.0.2</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
  
     <profile>
       <id>hadoop_trunk</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 5bd5686..1bd79b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -21,6 +21,9 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.giraph.utils.ConfigurationUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.job.GiraphJob;
+/*if[PURE_YARN]
+import org.apache.giraph.yarn.GiraphYarnClient;
+end[PURE_YARN]*/
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
@@ -64,16 +67,26 @@ public class GiraphRunner implements Tool {
    * @return job run exit code
    */
   public int run(String[] args) throws Exception {
+    if (null == getConf()) { // for YARN profile
+      conf = new Configuration();
+    }
     GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
     CommandLine cmd = ConfigurationUtils.parseArgs(giraphConf, args);
     if (null == cmd) {
       return 0; // user requested help/info printout, don't run a job.
     }
 
+    // set up job for various platforms
     final String vertexClassName = args[0];
-    GiraphJob job = new GiraphJob(giraphConf, "Giraph: " + vertexClassName);
+    final String jobName = "Giraph: " + vertexClassName;
+    /*if[PURE_YARN]
+    GiraphYarnClient job = new GiraphYarnClient(giraphConf, jobName);
+    else[PURE_YARN]*/
+    GiraphJob job = new GiraphJob(giraphConf, jobName);
     prepareHadoopMRJob(job, cmd);
+    /*end[PURE_YARN]*/
 
+    // run the job, collect results
     if (LOG.isDebugEnabled()) {
       LOG.debug("Attempting to run Vertex: " + vertexClassName);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index cc53271..8f88c80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -52,13 +52,16 @@ public class BspInputFormat extends InputFormat<Text, Text> {
     int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
     boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
     int maxTasks = maxWorkers;
-    if (splitMasterWorker) {
+    // if this is a YARN job, separate ZK should already be running
+    boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
+    if (splitMasterWorker && !isYarnJob) {
       int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
       maxTasks += zkServers;
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
           ", split master/worker = " + splitMasterWorker +
+          ", is YARN-only job = " + isYarnJob +
           ", total max tasks = " + maxTasks);
     }
     return maxTasks;

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 040c26f..8a78313 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -434,6 +434,10 @@ public class GiraphConfiguration extends Configuration
     set(ZOOKEEPER_LIST, serverList);
   }
 
+  /**
+   * Getter for SPLIT_MASTER_WORKER flag.
+   * @return boolean flag value.
+   */
   public final boolean getSplitMasterWorker() {
     return SPLIT_MASTER_WORKER.get(this);
   }
@@ -475,6 +479,50 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
+   * actually managing our cluster nodes, i.e. each task is a Mapper.
+   * @return TRUE if this is a pure YARN job.
+   */
+  public boolean isPureYarnJob() {
+    return IS_PURE_YARN_JOB.get(this);
+  }
+
+  /**
+   * Jars required in "Pure YARN" jobs (names only, no paths) should
+   * be listed here in full, including Giraph framework jar(s).
+   * @return the comma-separated list of jar names for export to cluster.
+   */
+  public String getYarnLibJars() {
+    return GIRAPH_YARN_LIBJARS.get(this);
+  }
+
+  /**
+   * Populate jar list for Pure YARN jobs.
+   * @param jarList a comma-separated list of jar names
+   */
+  public void setYarnLibJars(String jarList) {
+    GIRAPH_YARN_LIBJARS.set(this, jarList);
+  }
+
+  /**
+   * Get heap size (in MB) for each task in our Giraph job run,
+   * assuming this job will run on the "pure YARN" profile.
+   * @return the heap size for all tasks, in MB
+   */
+  public int getYarnTaskHeapMb() {
+    return GIRAPH_YARN_TASK_HEAP_MB.get(this);
+  }
+
+  /**
+   * Set heap size for Giraph tasks in our job run, assuming
+   * the job will run on the "pure YARN" profile.
+   * @param heapMb the heap size for all tasks
+   */
+  public void setYarnTaskHeapMb(int heapMb) {
+    GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
+  }
+
+  /**
    * Get the ZooKeeper list.
    *
    * @return ZooKeeper list of strings, comma separated or null if none set.
@@ -496,10 +544,27 @@ public class GiraphConfiguration extends Configuration
     return LOG_THREAD_LAYOUT.get(this);
   }
 
+  /**
+   * is this job run a local test?
+   * @return the test status as recorded in the Configuration
+   */
   public boolean getLocalTestMode() {
     return LOCAL_TEST_MODE.get(this);
   }
 
+  /**
+   * Flag this job as a local test run.
+   * @param flag the test status for this job
+   */
+  public void setLocalTestMode(boolean flag) {
+    LOCAL_TEST_MODE.set(this, flag);
+  }
+
+  /**
+   * The number of server tasks in our ZK quorum for
+   * this job run.
+   * @return the number of ZK servers in the quorum
+   */
   public int getZooKeeperServerCount() {
     return ZOOKEEPER_SERVER_COUNT.get(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index eaa8363..730fa5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -141,8 +141,21 @@ public interface GiraphConstants {
   BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
       new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
 
-  /** Output Format Path (for Giraph-on-YARN) */
-  String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
+  /** conf key for comma-separated list of jars to export to YARN workers */
+  StrConfOption GIRAPH_YARN_LIBJARS =
+    new StrConfOption("giraph.yarn.libjars", "");
+  /** Name of the XML file that will export our Configuration to YARN workers */
+  String GIRAPH_YARN_CONF_FILE = "giraph-conf.xml";
+  /** Giraph default heap size for all tasks when running on YARN profile */
+  int GIRAPH_YARN_TASK_HEAP_MB_DEFAULT = 1024;
+  /** Name of Giraph property for user-configurable heap memory per worker */
+  IntConfOption GIRAPH_YARN_TASK_HEAP_MB = new IntConfOption(
+    "giraph.yarn.task.heap.mb", GIRAPH_YARN_TASK_HEAP_MB_DEFAULT);
+  /** Default priority level in YARN for our task containers */
+  int GIRAPH_YARN_PRIORITY = 10;
+  /** Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks) */
+  BooleanConfOption IS_PURE_YARN_JOB =
+    new BooleanConfOption("giraph.pure.yarn.job", false);
 
   /** Vertex index class */
   ClassConfOption<WritableComparable> VERTEX_ID_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3ae5ed3..8ed44e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -94,10 +94,11 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
 public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   E extends Writable, M extends Writable> implements
   ResetSuperstepMetricsObserver {
-  static {
+  /*if_not[PURE_YARN]
+  static { // Eliminate this? Even MRv1 tasks should not need it here.
     Configuration.addDefaultResource("giraph-site.xml");
   }
-
+  end[PURE_YARN]*/
   /** Name of metric for superstep time in msec */
   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
   /** Name of metric for compute on all vertices in msec */
@@ -157,6 +158,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private String serverPortList;
   /** The Hadoop Mapper#Context for this job */
   private Mapper<?, ?, ?, ?>.Context context;
+  /** is this GraphTaskManager the master? */
+  private boolean isMaster;
 
   /**
    * Default constructor for GiraphTaskManager.
@@ -165,6 +168,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    */
   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
+    this.isMaster = false;
   }
 
   /**
@@ -174,9 +178,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   public void setup(Path[] zkPathList)
     throws IOException, InterruptedException {
     context.setStatus("setup: Beginning worker setup.");
-    determineClassTypes(context.getConfiguration());
     conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
       context.getConfiguration());
+    determineClassTypes(conf);
     // configure global logging level for Giraph job
     initializeAndConfigureLogging();
     // init the metrics objects
@@ -302,6 +306,23 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Sets the "isMaster" flag for final output commit to happen on master.
+   * @param im the boolean input to set isMaster. Applies to "pure YARN only"
+   */
+  public void setIsMaster(final boolean im) {
+    this.isMaster = im;
+  }
+
+  /**
+   * Get "isMaster" status flag -- we need to know if we're the master in the
+   * "finally" block of our GiraphYarnTask#execute() to commit final job output.
+   * @return true if this task IS the master.
+   */
+  public boolean isMaster() {
+    return isMaster;
+  }
+
+  /**
    * Produce a reference to the "start" superstep timer for the current
    * superstep.
    * @param superstep the current superstep count
@@ -455,7 +476,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 
   /**
    * Copied from JobConf to get the location of this jar.  Workaround for
-   * things like Oozie map-reduce jobs.
+   * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
+   * make use of this, as the jars are unpacked at each container site.
    *
    * @param myClass Class to search the class loader path for to locate
    *        the relevant jar file
@@ -574,7 +596,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private void locateZookeeperClasspath(Path[] fileClassPaths)
     throws IOException {
     if (!conf.getLocalTestMode()) {
-      //Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
       String zkClasspath = null;
       if (fileClassPaths == null) {
         if (LOG.isInfoEnabled()) {
@@ -584,7 +605,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
         if (jarFile == null) {
           jarFile = findContainingJar(getClass());
         }
-        zkClasspath = jarFile.replaceFirst("file:", "");
+        // Pure YARN profiles will use unpacked resources, so calls
+        // to "findContainingJar()" in that context can return NULL!
+        zkClasspath = null == jarFile ?
+          "./*" : jarFile.replaceFirst("file:", "");
       } else {
         StringBuilder sb = new StringBuilder();
         sb.append(fileClassPaths[0]);

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 404e47e..9f4bcbf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -344,15 +344,21 @@ public class BspServiceMaster<I extends WritableComparable,
     LOG.fatal("failJob: Killing job " + getJobId());
     LOG.fatal("failJob: exception " + e.toString());
     try {
-      @SuppressWarnings("deprecation")
-      org.apache.hadoop.mapred.JobClient jobClient =
+      if (getConfiguration().isPureYarnJob()) {
+        throw new RuntimeException(
+          "BspServiceMaster (YARN profile) is " +
+          "FAILING this task, throwing exception to end job run.", e);
+      } else {
+        @SuppressWarnings("deprecation")
+        org.apache.hadoop.mapred.JobClient jobClient =
           new org.apache.hadoop.mapred.JobClient(
-              (org.apache.hadoop.mapred.JobConf)
-              getContext().getConfiguration());
-      @SuppressWarnings("deprecation")
-      JobID jobId = JobID.forName(getJobId());
-      RunningJob job = jobClient.getJob(jobId);
-      job.killJob();
+            (org.apache.hadoop.mapred.JobConf)
+            getContext().getConfiguration());
+        @SuppressWarnings("deprecation")
+        JobID jobId = JobID.forName(getJobId());
+        RunningJob job = jobClient.getJob(jobId);
+        job.killJob();
+      }
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     } finally {
@@ -1737,6 +1743,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
 
     if (isMaster) {
+      getGraphTaskManager().setIsMaster(true);
       cleanUpZooKeeper();
       // If desired, cleanup the checkpoint directory
       if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
@@ -1750,7 +1757,6 @@ public class BspServiceMaster<I extends WritableComparable,
         }
       }
       aggregatorHandler.close();
-
       masterClient.closeConnections();
       masterServer.close();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index bd30455..9ebe693 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.utils;
 
+/*if[PURE_YARN]
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+end[PURE_YARN]*/
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
@@ -41,6 +44,8 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -57,16 +62,33 @@ public final class ConfigurationUtils {
   private static final Logger LOG =
     Logger.getLogger(ConfigurationUtils.class);
   /** The base path for output dirs as saved in GiraphConfiguration */
-  private static final Path BASE_OUTPUT_DIR =
-    new Path("hdfs://user/" + System.getenv("USER"));
+  private static final Path BASE_OUTPUT_PATH;
+  static {
+    // whether local or remote, if there's no *-site.xml's to find, we're done
+    try {
+      BASE_OUTPUT_PATH = FileSystem.get(new Configuration()).getHomeDirectory();
+    } catch (IOException ioe) {
+      throw new IllegalStateException("Error locating default base path!", ioe);
+    }
+  }
   /** Maintains our accepted options in case the caller wants to add some */
   private static Options OPTIONS;
+  /*if_not[PURE_YARN]
+  private static String OUTDIR = ""; // no-op placeholder for YARN
+  end[PURE_YARN]*/
+
   static {
     OPTIONS = new Options();
     OPTIONS.addOption("h", "help", false, "Help");
     OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
         "algorithms");
     OPTIONS.addOption("q", "quiet", false, "Quiet output");
+    OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
+      "filenames to distribute to Giraph tasks and ApplicationMaster. " +
+      "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
+    OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
+      "Giraph task (YARN only.) Defaults to " +
+      GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
     OPTIONS.addOption("w", "workers", true, "Number of workers");
     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
@@ -304,11 +326,20 @@ public final class ConfigurationUtils {
         }
       }
     }
+    // YARN-ONLY OPTIONS
+    if (cmd.hasOption("yj")) {
+      giraphConfiguration.setYarnLibJars(cmd.getOptionValue("yj"));
+    }
+    if (cmd.hasOption("yh")) {
+      giraphConfiguration.setYarnTaskHeapMb(
+        Integer.parseInt(cmd.getOptionValue("yh")));
+    }
     if (cmd.hasOption("of")) {
       if (cmd.hasOption("op")) {
-        Path outputDir = new Path(BASE_OUTPUT_DIR, cmd.getOptionValue("op"));
-        giraphConfiguration.set(
-          GiraphConstants.GIRAPH_OUTPUT_DIR, outputDir.toString());
+        Path outputDir = new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
+        outputDir = // for YARN conf to get the out dir we need w/o a Job obj
+          outputDir.getFileSystem(giraphConfiguration).makeQualified(outputDir);
+        giraphConfiguration.set(OUTDIR, outputDir.toString());
       } else {
         if (LOG.isInfoEnabled()) {
           LOG.info("No output path specified. Ensure your OutputFormat " +
@@ -316,6 +347,7 @@ public final class ConfigurationUtils {
         }
       }
     }
+    // END YARN-ONLY OPTIONS
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 35db999..2ea91b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -978,9 +979,30 @@ else[HADOOP_NON_SECURE]*/
       getContext().progress();
       ++partitionIndex;
     }
-    vertexWriter.close(getContext());
+    vertexWriter.close(getContext()); // the temp results are saved now
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "saveVertices: Done saving vertices");
+      "saveVertices: Done saving vertices.");
+    // YARN: must complete the commit the "task" output, Hadoop isn't there.
+    if (getConfiguration().isPureYarnJob() &&
+      getConfiguration().getVertexOutputFormatClass() != null) {
+      try {
+        OutputCommitter outputCommitter =
+          vertexOutputFormat.getOutputCommitter(getContext());
+        if (outputCommitter.needsTaskCommit(getContext())) {
+          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+            "OutputCommitter: committing task output.");
+          // transfer from temp dirs to "task commit" dirs to prep for
+          // the master's OutputCommitter#commitJob(context) call to finish.
+          outputCommitter.commitTask(getContext());
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
   }
 
   @Override
@@ -1403,6 +1425,12 @@ else[HADOOP_NON_SECURE]*/
             "to see if it needs to restart");
       }
       JSONObject jsonObj = getJobState();
+      // in YARN, we have to manually commit our own output in 2 stages that we
+      // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
+      if (getConfiguration().isPureYarnJob() && null == jsonObj) {
+        LOG.error("BspServiceWorker#getJobState() came back NULL.");
+        return false; // the event has been processed.
+      }
       try {
         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
             ApplicationState.START_SUPERSTEP) &&

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
new file mode 100644
index 0000000..c2b88a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
+import java.security.PrivilegedAction;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+  .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+  .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The YARN Application Master for Giraph is launched when the GiraphYarnClient
+ * successfully requests an execution container from the Resource Manager. The
+ * Application Master is provided by Giraph to manage all requests for resources
+ * (worker nodes, memory, jar files, job configuration metadata, etc.) that
+ * Giraph will need to perform the job. When Giraph runs in a non-YARN context,
+ * the role of the Application Master is played by Hadoop when it launches our
+ * GraphMappers (worker/master task nodes) to run the job.
+ */
+public class GiraphApplicationMaster {
+  /** Logger */
+  private static final Logger LOG =
+    Logger.getLogger(GiraphApplicationMaster.class);
+  /** Exit code for YARN containers that were manually killed/aborted */
+  private static final int YARN_ABORT_EXIT_STATUS = -100;
+  /** Exit code for successfully run YARN containers */
+  private static final int YARN_SUCCESS_EXIT_STATUS = 0;
+  /** millis to sleep between heartbeats during long loops */
+  private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
+  /** A reusable map of resources already in HDFS for each task to copy-to-local
+   * env and use to launch each GiraphYarnTask. */
+  private static Map<String, LocalResource> LOCAL_RESOURCES;
+  /** Initialize the Configuration class with the resource file exported by
+   * the YarnClient. We will need to export this resource to the tasks also.
+   * Construct the HEARTBEAT to use to ping the RM about job progress/health.
+   */
+  static {
+    // pick up new conf XML file and populate it with stuff exported from client
+    Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+  }
+
+  /** Handle to AppMaster's RPC connection to YARN and the RM. */
+  private final AMRMProtocol resourceManager;
+  /** bootstrap handle to YARN RPC service */
+  private final YarnRPC rpc;
+  /** GiraphApplicationMaster's application attempt id */
+  private final ApplicationAttemptId appAttemptId;
+  /** GiraphApplicationMaster container id. Leave me here, I'm very useful */
+  private final ContainerId containerId;
+  /** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */
+  private final int containersToLaunch;
+  /** MB of JVM heap per Giraph task container */
+  private final int heapPerContainer;
+  /** Giraph configuration for this job, transported here by YARN framework */
+  private final ImmutableClassesGiraphConfiguration giraphConf;
+  /** Completed Containers Counter */
+  private final AtomicInteger completedCount;
+  /** Failed Containers Counter */
+  private final AtomicInteger failedCount;
+  /** Number of containers requested (hopefully '-w' from our conf) */
+  private final AtomicInteger allocatedCount;
+  /** Number of successfully completed containers in this job run. */
+  private final AtomicInteger successfulCount;
+  /** the ACK #'s for AllocateRequests + heartbeats == last response # */
+  private AtomicInteger lastResponseId;
+  /** Executor to attempt asynchronous launches of Giraph containers */
+  private ExecutorService executor;
+  /** YARN progress is a <code>float</code> between 0.0f and 1.0f */
+  private float progress;
+  /** An empty resource request with which to send heartbeats + progress */
+  private AllocateRequest heartbeat;
+
+  /**
+   * Construct the GiraphAppMaster, populate fields using env vars
+   * set up by YARN framework in this execution container.
+   * @param cId the ContainerId
+   * @param aId the ApplicationAttemptId
+   */
+  protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
+    throws IOException {
+    containerId = cId; // future good stuff will need me to operate.
+    appAttemptId = aId;
+    progress = 0.0f;
+    lastResponseId = new AtomicInteger(0);
+    giraphConf =
+      new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
+    completedCount = new AtomicInteger(0);
+    failedCount = new AtomicInteger(0);
+    allocatedCount = new AtomicInteger(0);
+    successfulCount = new AtomicInteger(0);
+    rpc = YarnRPC.create(giraphConf);
+    resourceManager = getHandleToRm();
+    containersToLaunch = giraphConf.getMaxWorkers() + 1;
+    executor = Executors.newFixedThreadPool(containersToLaunch);
+    heapPerContainer = giraphConf.getYarnTaskHeapMb();
+  }
+
+  /**
+   * Coordinates all requests for Giraph's worker/master task containers, and
+   * manages application liveness heartbeat, completion status, teardown, etc.
+   */
+  private void run() {
+    // register Application Master with the YARN Resource Manager so we can
+    // begin requesting resources. The response contains useful cluster info.
+    try {
+      resourceManager.registerApplicationMaster(getRegisterAppMasterRequest());
+    } catch (IOException ioe) {
+      throw new IllegalStateException(
+        "GiraphApplicationMaster failed to register with RM.", ioe);
+    }
+
+    try {
+      // make the request only ONCE; only request more on container failure etc.
+      AMResponse amResponse = sendAllocationRequest();
+      logClusterResources(amResponse);
+      // loop here, waiting for TOTAL # REQUESTED containers to be available
+      // and launch them piecemeal they are reported to us in heartbeat pings.
+      launchContainersAsynchronously(amResponse);
+      // wait for the containers to finish & tally success/fails
+      awaitJobCompletion(); // all launched tasks are done before complete call
+    } finally {
+      // if we get here w/o problems, the executor is already long finished.
+      if (null != executor && !executor.isTerminated()) {
+        executor.shutdownNow(); // force kill, especially if got here by throw
+      }
+      // When the application completes, it should send a "finish request" to RM
+      try {
+        resourceManager.finishApplicationMaster(buildFinishAppMasterRequest());
+      } catch (YarnRemoteException yre) {
+        LOG.error("GiraphApplicationMaster failed to un-register with RM", yre);
+      }
+      if (null != rpc) {
+        rpc.stopProxy(resourceManager, giraphConf);
+      }
+    }
+  }
+
+  /**
+   * Reports the cluster resources in the AM response to our initial ask.
+   * @param amResponse the AM response from YARN.
+   */
+  private void logClusterResources(final AMResponse amResponse) {
+    // Check what the current available resources in the cluster are
+    Resource availableResources = amResponse.getAvailableResources();
+    LOG.info("Initial Giraph resource request for " + containersToLaunch +
+      " containers has been submitted. " +
+      "The RM reports cluster headroom is: " + availableResources);
+  }
+
+  /**
+   * Utility to build the final "job run is finished" request to the RM.
+   * @return the finish app master request, to send to the RM.
+   */
+  private FinishApplicationMasterRequest buildFinishAppMasterRequest() {
+    LOG.info("Application completed. Signalling finish to RM");
+    FinishApplicationMasterRequest finishRequest =
+      Records.newRecord(FinishApplicationMasterRequest.class);
+    finishRequest.setAppAttemptId(appAttemptId);
+    FinalApplicationStatus appStatus;
+    String appMessage = "Container Diagnostics: " +
+      " allocated=" + allocatedCount.get() +
+      ", completed=" + completedCount.get() +
+      ", succeeded=" + successfulCount.get() +
+      ", failed=" + failedCount.get();
+    if (successfulCount.get() == containersToLaunch) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+    }
+    finishRequest.setDiagnostics(appMessage);
+    finishRequest.setFinishApplicationStatus(appStatus);
+    return finishRequest;
+  }
+
+  /**
+   * Loop and check the status of the containers until all are finished,
+   * logging how each container meets its end: success, error, or abort.
+   */
+  private void awaitJobCompletion() {
+    List<ContainerStatus> completedContainers;
+    do {
+      try {
+        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+      } catch (InterruptedException ignored) {
+        final int notFinished = containersToLaunch - completedCount.get();
+        LOG.info("GiraphApplicationMaster interrupted from sleep while " +
+          " waiting for " + notFinished + "containers to finish job.");
+      }
+      updateProgress();
+      completedContainers =
+          sendHeartbeat().getAMResponse().getCompletedContainersStatuses();
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID= " +
+          containerStatus.getContainerId() +
+          ", state=" + containerStatus.getState() +
+          ", exitStatus=" + containerStatus.getExitStatus() +
+          ", diagnostics=" + containerStatus.getDiagnostics());
+        switch (containerStatus.getExitStatus()) {
+        case YARN_SUCCESS_EXIT_STATUS:
+          successfulCount.incrementAndGet();
+          break;
+        case YARN_ABORT_EXIT_STATUS:
+          break; // not success or fail
+        default:
+          failedCount.incrementAndGet();
+          break;
+        }
+        completedCount.incrementAndGet();
+      } // end completion check loop
+    } while (completedCount.get() < containersToLaunch);
+  }
+
+  /** Update the progress value for our next heartbeat (allocate request) */
+  private void updateProgress() {
+    // set progress to "half done + ratio of completed containers so far"
+    final float ratio = completedCount.get() / (float) containersToLaunch;
+    progress = 0.5f + ratio / 2.0f;
+  }
+
+  /**
+   * Loop while checking container request status, adding each new bundle of
+   * containers allocated to our executor to launch (run Giraph BSP task) the
+   * job on each. Giraph's full resource request was sent ONCE, but these
+   * containers will become available in groups, over a period of time.
+   * @param amResponse metadata about our AllocateRequest's results.
+   */
+  private void launchContainersAsynchronously(AMResponse amResponse) {
+    List<Container> allocatedContainers;
+    do {
+      // get fresh report on # alloc'd containers, sleep between checks
+      if (null == amResponse) {
+        amResponse = sendHeartbeat().getAMResponse();
+      }
+      allocatedContainers = amResponse.getAllocatedContainers();
+      allocatedCount.addAndGet(allocatedContainers.size());
+      LOG.info("Waiting for task containers: " + allocatedCount.get() +
+        " allocated out of " + containersToLaunch + " required.");
+      startContainerLaunchingThreads(allocatedContainers);
+      amResponse = null;
+      try {
+        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+      } catch (InterruptedException ignored) {
+        LOG.info("launchContainerAsynchronously() raised InterruptedException");
+      }
+    } while (containersToLaunch > allocatedCount.get());
+  }
+
+  /**
+   * For each container successfully allocated, attempt to set up and launch
+   * a Giraph worker/master task.
+   * @param allocatedContainers the containers we have currently allocated.
+   */
+  private void startContainerLaunchingThreads(final List<Container>
+    allocatedContainers) {
+    progress = allocatedCount.get() / (2.0f * containersToLaunch);
+    int placeholder = 0;
+    for (Container allocatedContainer : allocatedContainers) {
+      LOG.info("Launching shell command on a new container." +
+        ", containerId=" + allocatedContainer.getId() +
+        ", containerNode=" + allocatedContainer.getNodeId().getHost() +
+        ":" + allocatedContainer.getNodeId().getPort() +
+        ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
+        ", containerState=" + allocatedContainer.getState() +
+        ", containerResourceMemory=" +
+        allocatedContainer.getResource().getMemory());
+      // Launch and start the container on a separate thread to keep the main
+      // thread unblocked as all containers may not be allocated at one go.
+      LaunchContainerRunnable launchThread =
+        new LaunchContainerRunnable(allocatedContainer, heapPerContainer);
+      executor.execute(launchThread);
+    }
+  }
+
+  /**
+   * Sends heartbeat messages that include progress amounts. These are in the
+   * form of a YARN AllocateRequest object that asks for 0 resources.
+   * @return the AllocateResponse, which we may or may not need.
+   */
+  private AllocateResponse sendHeartbeat() {
+    heartbeat.setProgress(progress);
+    heartbeat.setResponseId(lastResponseId.incrementAndGet());
+    AllocateResponse allocateResponse = null;
+    try {
+      allocateResponse = resourceManager.allocate(heartbeat);
+      final int responseId = allocateResponse.getAMResponse().getResponseId();
+      if (responseId != lastResponseId.get()) {
+        lastResponseId.set(responseId);
+      }
+      checkForRebootFlag(allocateResponse.getAMResponse());
+      return allocateResponse;
+    } catch (YarnRemoteException yre) {
+      throw new IllegalStateException("sendHeartbeat() failed with " +
+        "YarnRemoteException: ", yre);
+    }
+  }
+
+  /**
+   * Compose and send the allocation request for our Giraph BSP worker/master
+   * compute nodes. Right now the requested containers are identical, mirroring
+   * Giraph's behavior when running on Hadoop MRv1. Giraph could use YARN
+   * to set fine-grained capability to each container, including host choice.
+   * @return The AM resource descriptor with our container allocations.
+   */
+  private AMResponse sendAllocationRequest() {
+    AllocateRequest allocRequest = Records.newRecord(AllocateRequest.class);
+    try {
+      List<ResourceRequest> containerList = buildResourceRequests();
+      allocRequest.addAllAsks(containerList);
+      List<ContainerId> releasedContainers = Lists.newArrayListWithCapacity(0);
+      allocRequest.setResponseId(lastResponseId.get());
+      allocRequest.setApplicationAttemptId(appAttemptId);
+      allocRequest.addAllReleases(releasedContainers);
+      allocRequest.setProgress(progress);
+      AllocateResponse allocResponse = resourceManager.allocate(allocRequest);
+      AMResponse amResponse = allocResponse.getAMResponse();
+      if (amResponse.getResponseId() != lastResponseId.get()) {
+        lastResponseId.set(amResponse.getResponseId());
+      }
+      checkForRebootFlag(amResponse);
+      // now, make THIS our new HEARTBEAT object, but with ZERO new requests!
+      initHeartbeatRequestObject(allocRequest);
+      return amResponse;
+    } catch (YarnRemoteException yre) {
+      throw new IllegalStateException("Giraph Application Master could not " +
+        "successfully allocate the specified containers from the RM.", yre);
+    }
+  }
+
+  /**
+   * If the YARN RM gets way out of sync with our App Master, its time to
+   * fail the job/restart. This should trigger the job end and cleanup.
+   * @param amResponse RPC response from YARN RM to check for reboot flag.
+   */
+  private void checkForRebootFlag(AMResponse amResponse) {
+    if (amResponse.getReboot()) {
+      LOG.error("AMResponse: " + amResponse + " raised YARN REBOOT FLAG!");
+      throw new RuntimeException("AMResponse " + amResponse +
+        " signaled GiraphApplicationMaster with REBOOT FLAG. Failing job.");
+    }
+  }
+
+
+  /**
+   * Reuses the initial container request (switched to "0 asks" so no new allocs
+   * occur) and sends all heartbeats using that request object.
+   * @param allocRequest the allocation request object to use as heartbeat.
+   */
+  private void initHeartbeatRequestObject(AllocateRequest allocRequest) {
+    allocRequest.clearAsks();
+    allocRequest.addAllAsks(Lists.<ResourceRequest>newArrayListWithCapacity(0));
+    heartbeat = allocRequest;
+  }
+
+  /**
+   * Utility to construct the ResourceRequest for our resource ask: all the
+   * Giraph containers we need, and their memory/priority requirements.
+   * @return a list of ResourceRequests to send (just one, for Giraph tasks)
+   */
+  private List<ResourceRequest> buildResourceRequests() {
+    // set up resource request for our Giraph BSP application
+    ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+    resourceRequest.setHostName("*"); // hand pick our worker locality someday
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
+    resourceRequest.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setVirtualCores(1); // new YARN API, won't work version < 2.0.3
+    capability.setMemory(heapPerContainer);
+    resourceRequest.setCapability(capability);
+    resourceRequest.setNumContainers(containersToLaunch);
+    return ImmutableList.of(resourceRequest);
+  }
+
+  /**
+   * Obtain handle to RPC connection to Resource Manager.
+   * @return the AMRMProtocol handle to YARN RPC.
+   */
+  private AMRMProtocol getHandleToRm() {
+    YarnConfiguration yarnConf = new YarnConfiguration(giraphConf);
+    final InetSocketAddress rmAddress = yarnConf.getSocketAddr(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      UserGroupInformation currentUser;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new IllegalStateException("Could not obtain UGI for user.", ioe);
+      }
+      String tokenURLEncodedStr = System.getenv(
+        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException ioe) {
+        throw new IllegalStateException("Could not decode token from URL", ioe);
+      }
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is: " + token);
+      }
+      currentUser.addToken(token);
+      return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+        @Override
+        public AMRMProtocol run() {
+          return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            rmAddress, giraphConf);
+        }
+      });
+    } else { // non-secure
+      return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+        rmAddress, yarnConf);
+    }
+  }
+
+  /**
+   * Get the request to register this Application Master with the RM.
+   * @return the populated AM request.
+   */
+  private RegisterApplicationMasterRequest getRegisterAppMasterRequest() {
+    RegisterApplicationMasterRequest appMasterRequest =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    appMasterRequest.setApplicationAttemptId(appAttemptId);
+    try {
+      appMasterRequest.setHost(InetAddress.getLocalHost().getHostName());
+    } catch (UnknownHostException uhe) {
+      throw new IllegalStateException(
+        "Cannot resolve GiraphApplicationMaster's local hostname.", uhe);
+    }
+    // useful for a Giraph WebUI or whatever: play with these
+    // appMasterRequest.setRpcPort(appMasterRpcPort);
+    // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+    return appMasterRequest;
+  }
+
+  /**
+   * Lazily compose the map of jar and file names to LocalResource records for
+   * inclusion in GiraphYarnTask container requests. Can re-use the same map
+   * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
+   * @return the resource map for a ContainerLaunchContext
+   */
+  private Map<String, LocalResource> getTaskResourceMap() {
+    // Set the local resources: just send the copies already in HDFS
+    if (null == LOCAL_RESOURCES) {
+      LOCAL_RESOURCES = Maps.newHashMap();
+      try {
+        // if you have to update the giraphConf for export to tasks, do it now
+        updateGiraphConfForExport();
+        YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
+          appAttemptId.getApplicationId());
+      } catch (IOException ioe) {
+        // fail fast, this container will never launch.
+        throw new IllegalStateException("Could not configure the container" +
+          "launch context for GiraphYarnTasks.", ioe);
+      }
+    }
+    // else, return the prepopulated copy to reuse for each GiraphYarkTask
+    return LOCAL_RESOURCES;
+  }
+
+  /**
+   * If you're going to make ANY CHANGES to your local GiraphConfiguration
+   * while running the GiraphApplicationMaster, put them here.
+   * This method replaces the current XML file GiraphConfiguration
+   * stored in HDFS with the copy you have modified locally in-memory.
+   */
+  private void updateGiraphConfForExport()
+    throws IOException {
+    // Giraph expects this MapReduce stuff
+    giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+      appAttemptId.getAttemptId());
+    // now republish the giraph-conf.xml in HDFS
+    YarnUtils.exportGiraphConfiguration(giraphConf,
+      appAttemptId.getApplicationId());
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManager} and launch the container
+   * that will house one of our Giraph worker (or master) tasks.
+   */
+  private class LaunchContainerRunnable implements Runnable {
+    /** Allocated container */
+    private Container container;
+    /** Handle to communicate with ContainerManager */
+    private ContainerManager containerManager;
+    /** Heap memory in MB to allocate for this JVM in the launched container */
+    private final int heapSize;
+
+    /**
+     * Constructor.
+     * @param newGiraphTaskContainer Allocated container
+     * @param heapMb the <code>-Xmx</code> setting for each launched task.
+     */
+    public LaunchContainerRunnable(final Container newGiraphTaskContainer,
+      final int heapMb) {
+      this.container = newGiraphTaskContainer;
+      this.heapSize = heapMb;
+    }
+
+    /**
+     * Helper function to connect to ContainerManager, which resides on the
+     * same compute node as this Giraph task's container. The CM starts tasks.
+     */
+    private void connectToCM() {
+      LOG.debug("Connecting to CM for containerid=" + container.getId());
+      String cmIpPortStr = container.getNodeId().getHost() + ":" +
+        container.getNodeId().getPort();
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+      LOG.info("Connecting to CM at " + cmIpPortStr);
+      this.containerManager = (ContainerManager)
+        rpc.getProxy(ContainerManager.class, cmAddress, giraphConf);
+    }
+
+    /**
+     * Connects to CM, sets up container launch context
+     * for shell command and eventually dispatches the container
+     * start request to the CM.
+     */
+    public void run() {
+      // Connect to ContainerManager
+      connectToCM();
+      // configure the launcher for the Giraph task it will host
+      StartContainerRequest startReq =
+        Records.newRecord(StartContainerRequest.class);
+      startReq.setContainerLaunchContext(buildContainerLaunchContext());
+      // request CM to start this container as spec'd in ContainerLaunchContext
+      try {
+        containerManager.startContainer(startReq);
+      } catch (YarnRemoteException yre) {
+        LOG.error("StartContainerRequest failed for containerId=" +
+                    container.getId(), yre);
+      }
+    }
+
+    /**
+     * Boilerplate to set up the ContainerLaunchContext to tell the Container
+     * Manager how to launch our Giraph task in the execution container we have
+     * already allocated.
+     * @return a populated ContainerLaunchContext object.
+     */
+    private ContainerLaunchContext buildContainerLaunchContext() {
+      LOG.info("Setting up container launch container for containerid=" +
+        container.getId());
+      ContainerLaunchContext launchContext = Records
+        .newRecord(ContainerLaunchContext.class);
+      launchContext.setContainerId(container.getId());
+      launchContext.setResource(container.getResource());
+      // args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
+      final List<String> commands = generateShellExecCommand();
+      launchContext.setCommands(commands);
+      // add user information to the job
+      String jobUserName = "ERROR_UNKNOWN_USER";
+      UserGroupInformation ugi = null;
+      try {
+        ugi = UserGroupInformation.getCurrentUser();
+        jobUserName = ugi.getUserName();
+      } catch (IOException ioe) {
+        jobUserName =
+          System.getenv(ApplicationConstants.Environment.USER.name());
+      }
+      launchContext.setUser(jobUserName);
+      LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
+      // Set the environment variables to inject into remote task's container
+      buildEnvironment(launchContext);
+      // Set the local resources: just send the copies already in HDFS
+      launchContext.setLocalResources(getTaskResourceMap());
+      return launchContext;
+    }
+
+    /**
+     * Generates our command line string used to launch our Giraph tasks.
+     * @return the BASH shell commands to launch the job.
+     */
+    private List<String> generateShellExecCommand() {
+      return ImmutableList.of("java " +
+        "-Xmx" + heapSize + "M " +
+        "-Xms" + heapSize + "M " +
+        "-cp .:${CLASSPATH} " +
+        "org.apache.giraph.yarn.GiraphYarnTask " +
+        appAttemptId.getApplicationId().getClusterTimestamp() + " " +
+        appAttemptId.getApplicationId().getId() + " " +
+        container.getId().getId() + " " +
+        appAttemptId.getAttemptId() + " " +
+        "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        "/task-" + container.getId().getId() + "-stdout.log " +
+        "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        "/task-" + container.getId().getId() + "-stderr.log "
+      );
+    }
+
+    /**
+     * Utility to populate the environment vars we wish to inject into the new
+     * containter's env when the Giraph BSP task is executed.
+     * @param launchContext the launch context which will set our environment
+     *                      vars in the app master's execution container.
+     */
+    private void buildEnvironment(final ContainerLaunchContext launchContext) {
+      Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
+      // pick up the local classpath so when we instantiate a Configuration
+      // remotely, we also get the "mapred-site.xml" and "yarn-site.xml"
+      YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
+      // set this map of env vars into the launch context.
+      launchContext.setEnvironment(classPathForEnv);
+    }
+  }
+
+  /**
+   * Application entry point
+   * @param args command-line args (set by GiraphYarnClient, if any)
+   */
+  public static void main(final String[] args) {
+    String containerIdString =
+        System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    if (containerIdString == null) {
+      // container id should always be set in the env by the framework
+      throw new IllegalArgumentException("ContainerId not found in env vars.");
+    }
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+    ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
+    try {
+      GiraphApplicationMaster giraphAppMaster =
+        new GiraphApplicationMaster(containerId, appAttemptId);
+      giraphAppMaster.run();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Throwable t) {
+      // CHECKSTYLE: resume IllegalCatch
+      LOG.error("GiraphApplicationMaster caught a " +
+                  "top-level exception in main.", t);
+      System.exit(2);
+    }
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
new file mode 100644
index 0000000..341db0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The initial launcher for a YARN-based Giraph job. This class attempts to
+ * configure and send a request to the ResourceManager for a single
+ * application container to host GiraphApplicationMaster. The RPC connection
+ * between the RM and GiraphYarnClient is the YARN ApplicationManager.
+ */
+public class GiraphYarnClient extends YarnClientImpl {
+  static {
+    Configuration.addDefaultResource("giraph-site.xml");
+  }
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphYarnClient.class);
+  /** Sleep time between silent progress checks */
+  private static final int JOB_STATUS_INTERVAL_MSECS = 800;
+  /** Memory (in MB) to allocate for our ApplicationMaster container */
+  private static final int YARN_APP_MASTER_MEMORY_MB = 1024;
+
+  /** human-readable job name */
+  private final String jobName;
+  /** Helper configuration from the job */
+  private final GiraphConfiguration giraphConf;
+  /** ApplicationId object (needed for RPC to ResourceManager) */
+  private ApplicationId appId;
+  /** # of sleeps between progress reports to client */
+  private int reportCounter;
+
+  /**
+   * Constructor. Requires caller to hand us a GiraphConfiguration.
+   *
+   * @param giraphConf User-defined configuration
+   * @param jobName User-defined job name
+   */
+  public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
+    throws IOException {
+    super();
+    this.reportCounter = 0;
+    this.jobName = jobName;
+    this.appId = null; // can't set this until after start()
+    this.giraphConf = giraphConf;
+    verifyOutputDirDoesNotExist();
+    super.init(this.giraphConf);
+  }
+
+  /**
+   * Submit a request to the Hadoop YARN cluster's ResourceManager
+   * to obtain an application container. This will run our ApplicationMaster,
+   * which will in turn request app containers for Giraphs' master and all
+   * worker tasks.
+   * @param verbose Not implemented yet, to provide compatibility w/GiraphJob
+   * @return true if job is successful
+   */
+  public boolean run(final boolean verbose) {
+    checkJobLocalZooKeeperSupported();
+    // init our connection to YARN ResourceManager RPC
+    start();
+    // request an application id from the RM
+    GetNewApplicationResponse getNewAppResponse;
+    try {
+      getNewAppResponse = super.getNewApplication();
+      // make sure we have the cluster resources to run the job.
+      checkPerNodeResourcesAvailable(getNewAppResponse);
+    } catch (YarnRemoteException yre) {
+      yre.printStackTrace();
+      return false;
+    }
+    appId = getNewAppResponse.getApplicationId();
+    LOG.info("Obtained new Application ID: " + appId);
+    // sanity check
+    applyConfigsForYarnGiraphJob();
+    // configure our request for an exec container for GiraphApplicationMaster
+    ApplicationSubmissionContext appContext = createAppSubmissionContext();
+    ContainerLaunchContext containerContext = buildContainerLaunchContext();
+    appContext.setAMContainerSpec(containerContext);
+    LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
+      "launch container is populated.");
+    // make the request, blow up if fail, loop and report job progress if not
+    try {
+      // obtain an "updated copy" of the appId for status checks/job kill later
+      appId = super.submitApplication(appContext);
+    } catch (YarnRemoteException yre) {
+      throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
+    }
+    LOG.info("GiraphApplicationMaster container request was submitted to " +
+      "ResourceManager for job: " + jobName);
+    return awaitGiraphJobCompletion();
+  }
+
+  /**
+   * Without Hadoop MR to check for us, make sure the output dir doesn't exist!
+   */
+  private void verifyOutputDirDoesNotExist() {
+    Path outDir = null;
+    try {
+      FileSystem fs = FileSystem.get(giraphConf);
+      String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
+      outDir =
+        new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
+      FileStatus outStatus = fs.getFileStatus(outDir);
+      if (outStatus.isDirectory() || outStatus.isFile() ||
+        outStatus.isSymlink()) {
+        throw new IllegalStateException("Path " + outDir + " already exists.");
+      }
+    } catch (IOException ioe) {
+      LOG.info("Final output path is: " + outDir);
+    }
+  }
+
+  /**
+   * Configuration settings we need to customize for a Giraph on YARN
+   * job. We need to call this EARLY in the job, before the GiraphConfiguration
+   * is exported to HDFS for localization in each task container.
+   */
+  private void applyConfigsForYarnGiraphJob() {
+    GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
+    GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
+    giraphConf.set("mapred.job.id", "giraph_yarn_" + appId); // ZK app base path
+  }
+
+  /**
+   * Utility to make sure we have the cluster resources we need to run this
+   * job. If they are not available, we should die here before too much setup.
+   * @param cluster the GetNewApplicationResponse from the YARN RM.
+   */
+  private void checkPerNodeResourcesAvailable(
+    final GetNewApplicationResponse cluster) {
+    // are there enough containers to go around for our Giraph job?
+    List<NodeReport> nodes = null;
+    int numContainers = 0;
+    long totalAvailable = 0;
+    try {
+      nodes = super.getNodeReports();
+    } catch (YarnRemoteException yre) {
+      throw new RuntimeException("GiraphYarnClient could not connect with " +
+        "the YARN ResourceManager to determine the number of available " +
+        "application containers.", yre);
+    }
+    for (NodeReport node : nodes) {
+      numContainers += node.getNumContainers();
+      totalAvailable += node.getCapability().getMemory();
+    }
+    // 1 master + all workers in -w command line arg
+    final int workers = giraphConf.getMaxWorkers() + 1;
+    if (workers < numContainers) {
+      throw new RuntimeException("Giraph job requires " + workers +
+        " containers to run; cluster only hosts " + numContainers);
+    }
+    checkAndAdjustPerTaskHeapSize(cluster);
+    final long totalAsk =
+      giraphConf.getYarnTaskHeapMb() * workers;
+    if (totalAsk > totalAvailable) {
+      throw new IllegalStateException("Giraph's estimated cluster heap " +
+        totalAsk + "MB ask is greater than the current available cluster " +
+        "heap of " + totalAvailable + "MB. Aborting Job.");
+    }
+  }
+
+  /**
+   * Adjust the user-supplied <code>-yh</code> and <code>-w</code>
+   * settings if they are too small or large for the current cluster,
+   * and re-record the new settings in the GiraphConfiguration for export.
+   * @param gnar the GetNewAppResponse from the YARN ResourceManager.
+   */
+  private void checkAndAdjustPerTaskHeapSize(GetNewApplicationResponse gnar) {
+    // do we have the right heap size on these cluster nodes to run our job?
+    final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
+    final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
+    // make sure heap size is OK for this cluster's available containers
+    int giraphMem = giraphConf.getYarnTaskHeapMb();
+    if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
+      LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
+    }
+    if (giraphMem > maxCapacity) {
+      LOG.info("Giraph's request of heap MB per-task is more than the " +
+        "minimum; downgrading Giraph to" + maxCapacity + "MB.");
+      giraphMem = maxCapacity;
+    }
+    if (giraphMem < minCapacity) {
+      LOG.info("Giraph's request of heap MB per-task is less than the " +
+        "minimum; upgrading Giraph to " + minCapacity + "MB.");
+      giraphMem = minCapacity;
+    }
+    giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
+  }
+
+  /**
+   * Kill time for the client, report progress occasionally, and otherwise
+   * just sleep and wait for the job to finish. If no AM response, kill the app.
+   * @return true if job run is successful.
+   */
+  private boolean awaitGiraphJobCompletion() {
+    boolean done;
+    ApplicationReport report = null;
+    try {
+      do {
+        try {
+          Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
+        } catch (InterruptedException ir) {
+          LOG.info("Progress reporter's sleep was interrupted!", ir);
+        }
+        report = super.getApplicationReport(appId);
+        done = checkProgress(report);
+      } while (!done);
+      if (!giraphConf.metricsEnabled()) {
+        cleanupJarCache();
+      }
+    } catch (IOException ex) {
+      final String diagnostics = (null == report) ? "" :
+        "Diagnostics: " + report.getDiagnostics();
+      LOG.error("Fatal fault encountered, failing " + jobName + ". " +
+        diagnostics, ex);
+      try {
+        LOG.error("FORCIBLY KILLING Application from AppMaster.");
+        super.killApplication(appId);
+      } catch (YarnRemoteException yre) {
+        LOG.error("Exception raised in attempt to kill application.", yre);
+      }
+      return false;
+    }
+    return printFinalJobReport();
+  }
+
+  /**
+   * Deletes the HDFS cache in YARN, which replaces DistributedCache of Hadoop.
+   * If metrics are enabled this will not get called (so you can examine cache.)
+   * @throws IOException if bad things happen.
+   */
+  private void cleanupJarCache() throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
+    if (fs.exists(baseCacheDir)) {
+      LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
+      fs.delete(baseCacheDir, true); // stuff inside
+      fs.delete(baseCacheDir, false); // dir itself
+    }
+  }
+
+  /**
+   * Print final formatted job report for local client that initiated this run.
+   * @return true for app success, false for failure.
+   */
+  private boolean printFinalJobReport() {
+    ApplicationReport report;
+    try {
+      report = super.getApplicationReport(appId);
+      FinalApplicationStatus finalAppStatus =
+        report.getFinalApplicationStatus();
+      final long secs =
+        (report.getFinishTime() - report.getStartTime()) / 1000L;
+      final String time = String.format("%d minutes, %d seconds.",
+        secs / 60L, secs % 60L);
+      LOG.info("Completed " + jobName + ": " +
+        finalAppStatus.name() + ", total running time: " + time);
+    } catch (YarnRemoteException yre) {
+      LOG.error("Exception encountered while attempting to request " +
+        "a final job report for " + jobName , yre);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Compose the ContainerLaunchContext for the Application Master.
+   * @return the CLC object populated and configured.
+   */
+  private ContainerLaunchContext buildContainerLaunchContext() {
+    ContainerLaunchContext appMasterContainer =
+      Records.newRecord(ContainerLaunchContext.class);
+    appMasterContainer.setEnvironment(buildEnvironment());
+    appMasterContainer.setLocalResources(buildLocalResourceMap());
+    appMasterContainer.setCommands(buildAppMasterExecCommand());
+    appMasterContainer.setResource(buildContainerMemory());
+    appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
+    return appMasterContainer;
+  }
+
+  /**
+   * Assess whether job is already finished/failed and 'done' flag needs to be
+   * set, prints progress display for client if all is going well.
+   * @param report the application report to assess.
+   * @return true if job report indicates the job run is over.
+   */
+  private boolean checkProgress(final ApplicationReport report) {
+    YarnApplicationState jobState = report.getYarnApplicationState();
+    if (jobState == YarnApplicationState.FINISHED ||
+      jobState == YarnApplicationState.KILLED) {
+      return true;
+    } else if (jobState == YarnApplicationState.FAILED) {
+      LOG.error(jobName + " reports FAILED state, diagnostics show: " +
+        report.getDiagnostics());
+      return true;
+    } else {
+      if (reportCounter++ % 5 == 0) {
+        displayJobReport(report);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Display a formatted summary of the job progress report from the AM.
+   * @param report the report to display.
+   */
+  private void displayJobReport(final ApplicationReport report) {
+    if (null == report) {
+      throw new IllegalStateException("[*] Latest ApplicationReport for job " +
+        jobName + " was not received by the local client.");
+    }
+    final float elapsed =
+      (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
+    LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
+    LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
+      report.getYarnApplicationState().name() + ", Containers used: " +
+      report.getApplicationResourceUsageReport().getNumUsedContainers());
+  }
+
+  /**
+   * Utility to produce the command line to activate the AM from the shell.
+   * @return A <code>List<String></code> of shell commands to execute in
+   *         the container allocated to us by the RM to host our App Master.
+   */
+  private List<String> buildAppMasterExecCommand() {
+    // 'gam-' prefix is for GiraphApplicationMaster in log file names
+    return ImmutableList.of("${JAVA_HOME}/bin/java " +
+      "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
+      "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!
+      "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
+      "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
+      "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
+    );
+  }
+
+  /**
+   * Check if the job's configuration is for a local run. These can all be
+   * removed as we expand the functionality of the "pure YARN" Giraph profile.
+   */
+  private void checkJobLocalZooKeeperSupported() {
+    final String checkZkList = giraphConf.getZookeeperList();
+    if (checkZkList == null || checkZkList.isEmpty()) {
+      throw new IllegalArgumentException("Giraph on YARN does not currently" +
+        "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" +
+        checkZkList + "'");
+    }
+  }
+
+  /**
+   * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS
+   * in the LocalResources map, copy to HDFS on that same registered path.
+   * @param map the LocalResources list to populate.
+   */
+  private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
+    throws IOException {
+    Set<String> jars = Sets.newHashSet();
+    String[] libJars = giraphConf.getYarnLibJars().split(",");
+    for (String libJar : libJars) {
+      jars.add(libJar);
+    }
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+    for (Path jar : YarnUtils.getLocalFiles(jars)) {
+      LOG.info("Located local resource for export at: " + jar);
+      Path dest = new Path(baseDir, jar.getName());
+      fs.copyFromLocalFile(false, true, jar, dest);
+      YarnUtils.addFileToResourceMap(map, fs, dest);
+    }
+  }
+
+  /**
+   * Construct the memory requirements for the AppMaster's container request.
+   * @return A Resource that wraps the memory request.
+   */
+  private Resource buildContainerMemory() {
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(YARN_APP_MASTER_MEMORY_MB);
+    return capability;
+  }
+
+  /**
+   * Create the mapping of environment vars that will be visible to the
+   * ApplicationMaster in its remote app container.
+   * @return a map of environment vars to set up for the AppMaster.
+   */
+  private Map<String, String> buildEnvironment() {
+    Map<String, String> environment =
+      Maps.<String, String>newHashMap();
+    YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
+    // TODO: add java.class.path to env map if running a local YARN minicluster.
+    return environment;
+  }
+
+  /**
+   * Create the mapping of files and JARs to send to the GiraphApplicationMaster
+   * and from there on to the Giraph tasks.
+   * @return the map of jars to local resource paths for transport
+   *         to the host container that will run our AppMaster.
+   */
+  private Map<String, LocalResource> buildLocalResourceMap() {
+    Map<String, LocalResource> localResources =
+        Maps.<String, LocalResource>newHashMap();
+    try {
+      // export the GiraphConfiguration to HDFS for localization to remote tasks
+      YarnUtils.exportGiraphConfiguration(giraphConf, appId);
+      YarnUtils.addGiraphConfToLocalResourceMap(
+        giraphConf, appId, localResources);
+      // add jars from '-yj' cmd-line arg to resource map for localization
+      addLocalJarsToResourceMap(localResources);
+      return localResources;
+    } catch (IOException ioe) {
+      throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
+    }
+  }
+
+  /**
+   * Create the app submission context, and populate it.
+   * @return the populated ApplicationSubmissionContext for the AppMaster.
+   */
+  private ApplicationSubmissionContext createAppSubmissionContext() {
+    ApplicationSubmissionContext appContext =
+      Records.newRecord(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(appId);
+    appContext.setApplicationName(jobName);
+    return appContext;
+  }
+}