You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/04/03 06:17:42 UTC
[1/2] GIRAPH-13: Port Giraph to YARN
Updated Branches:
refs/heads/trunk 67f5f7475 -> b2dff2751
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
new file mode 100644
index 0000000..d596413
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphTaskManager;
+
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * This process will execute the BSP graph tasks alloted to this YARN
+ * execution container. All tasks will be performed by calling the
+ * GraphTaskManager object. Since this GiraphYarnTask will
+ * not be passing data by key-value pairs through the MR framework, the
+ * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> {
+ static {
+ Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ }
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
+ /** Manage the framework-agnostic Giraph task for this job run */
+ private GraphTaskManager<I, V, E, M> graphTaskManager;
+ /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
+ private final int bspTaskId;
+ /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */
+ private Context proxy;
+ /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor. Build our DUMMY MRv1 data structures to pass to our
+ * GiraphTaskManager. This allows us to continue to look the other way
+ * while Giraph relies on MRv1 under the hood.
+ * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
+ * supplied by GiraphApplicationMaster.
+ */
+ public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ new GiraphConfiguration());
+ bspTaskId = taskAttemptId.getTaskID().getId();
+ conf.setInt("mapred.task.partition", bspTaskId);
+ proxy = buildProxyMapperContext(taskAttemptId);
+ graphTaskManager = new GraphTaskManager<I, V, E, M>(proxy);
+ }
+
+ /**
+ * Run one Giraph worker (or master) task, hosted in this execution container.
+ */
+ public void run() {
+ // Notify the master quicker if there is worker failure rather than
+ // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+ try {
+ graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
+ graphTaskManager.execute();
+ graphTaskManager.cleanup();
+ } catch (InterruptedException ie) {
+ LOG.error("run() caught an unrecoverable InterruptedException.", ie);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "run() caught an unrecoverable IOException.", ioe);
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (RuntimeException e) {
+ // CHECKSTYLE: resume IllegalCatch
+ graphTaskManager.zooKeeperCleanup();
+ graphTaskManager.workerFailureCleanup();
+ throw new RuntimeException(
+ "run: Caught an unrecoverable exception " + e.getMessage(), e);
+ } finally {
+ // YARN: must complete the commit of the final output, Hadoop isn't there.
+ finalizeYarnJob();
+ }
+ }
+
+ /**
+ * Without Hadoop MR to finish the consolidation of all the task output from
+ * each HDFS task tmp dir, it won't get done. YARN has some job finalization
+ * it must do "for us." -- AND must delete "jar cache" in HDFS too!
+ */
+ private void finalizeYarnJob() {
+ if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
+ conf.getVertexOutputFormatClass() != null) {
+ try {
+ LOG.info("Master is ready to commit final job output data.");
+ VertexOutputFormat vertexOutputFormat =
+ conf.createVertexOutputFormat();
+ OutputCommitter outputCommitter =
+ vertexOutputFormat.getOutputCommitter(proxy);
+ // now we will have our output in OUTDIR if all went well...
+ outputCommitter.commitJob(proxy);
+ LOG.info("Master has committed the final job output data.");
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while attempting to obtain " +
+ "OutputCommitter.", ie);
+ } catch (IOException ioe) {
+ LOG.error("Master task's attempt to commit output has " +
+ "FAILED.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Utility to generate dummy Mapper#Context for use in Giraph internals.
+ * This is the "key hack" to inject MapReduce-related data structures
+ * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
+ * into our Giraph BSP task code.
+ * @param tid the TaskAttemptID to construct this Mapper#Context from.
+ * @return sort of a Mapper#Context if you squint just right.
+ */
+ private Context buildProxyMapperContext(final TaskAttemptID tid) {
+ MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
+ conf, // our Configuration, populated back at the GiraphYarnClient.
+ tid, // our TaskAttemptId, generated w/YARN app, container, attempt IDs
+ null, // RecordReader here will never be used by Giraph
+ null, // RecordWriter here will never be used by Giraph
+ null, // OutputCommitter here will never be used by Giraph
+ new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
+ @Override
+ public void setStatus(String msg) {
+ LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
+ }
+ },
+ null); // Input split setting here will never be used by Giraph
+
+ // now, we wrap our MapContext ref so we can produce a Mapper#Context
+ WrappedMapper<Object, Object, Object, Object> wrappedMapper
+ = new WrappedMapper<Object, Object, Object, Object>();
+ return wrappedMapper.getMapContext(mc);
+ }
+
+ /**
+ * Default handler for uncaught exceptions.
+ */
+ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOG.fatal(
+ "uncaughtException: OverrideExceptionHandler on thread " +
+ t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Task entry point.
+ * @param args CLI arguments injected by GiraphApplicationMaster to hand off
+ * job, task, and attempt ID's to this (and every) Giraph task.
+ * Args should be: <code>AppId ContainerId AppAttemptId</code>
+ */
+ @SuppressWarnings("rawtypes")
+ public static void main(String[] args) {
+ if (args.length != 4) {
+ throw new IllegalStateException("GiraphYarnTask could not construct " +
+ "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
+ }
+ try {
+ GiraphYarnTask<?, ?, ?, ?> giraphYarnTask =
+ new GiraphYarnTask(getTaskAttemptID(args));
+ giraphYarnTask.run();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Throwable t) {
+ // CHECKSTYLE resume IllegalCatch
+ LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
+ System.exit(2);
+ } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
+ System.exit(0);
+ }
+
+ /**
+ * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
+ *
+ * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
+ * YARN container 1 is always AppMaster, so the least container id we will
+ * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce
+ * tasks must start at index 0. So we SUBTRACT TWO from each container id.
+ *
+ * @param args the command line args, fed to us by GiraphApplicationMaster.
+ * @return the TaskAttemptId object, populated with YARN job data.
+ */
+ private static TaskAttemptID getTaskAttemptID(String[] args) {
+ return new TaskAttemptID(
+ args[0], // YARN ApplicationId Cluster Timestamp
+ Integer.parseInt(args[1]), // YARN ApplicationId #
+ TaskID.getTaskType('m'), // Make Giraph think this is a Mapper task.
+ Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
+ Integer.parseInt(args[3])); // YARN AppAttemptId #
+ }
+
+ /**
+ * Utility to help log command line args in the event of an error.
+ * @param args the CLI args.
+ * @return a pretty-print of the input args.
+ */
+ private static String printArgs(String[] args) {
+ int count = 0;
+ StringBuilder sb = new StringBuilder();
+ for (String arg : args) {
+ sb.append("arg[" + (count++) + "] == " + arg + ", ");
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
new file mode 100644
index 0000000..aa042e8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.Sets;
+import java.io.FileOutputStream;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+/**
+ * Utilities that can only compile with versions of Hadoop that support YARN,
+ * so they live here instead of o.a.g.utils package.
+ */
+public class YarnUtils {
+ /** Class Logger */
+ private static final Logger LOG = Logger.getLogger(YarnUtils.class);
+ /** Default dir on HDFS (or equivalent) where LocalResources are stored */
+ private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
+
+ /** Private constructor, this is a utility class only */
+ private YarnUtils() { /* no-op */ }
+
+ /**
+ * Populates the LocalResources list with the HDFS paths listed in
+ * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the
+ * GiraphConfiguration for this job. Also adds the Giraph default application
+ * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.
+ * @param map the LocalResources list to populate.
+ * @param giraphConf the configuration to use to select jars to include.
+ * @param appId the ApplicationId, naming the the HDFS base dir for job jars.
+ */
+ public static void addFsResourcesToMap(Map<String, LocalResource> map,
+ GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
+ FileSystem fs = FileSystem.get(giraphConf);
+ Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+ boolean coreJarFound = false;
+ for (String fileName : giraphConf.getYarnLibJars().split(",")) {
+ if (fileName.length() > 0) {
+ Path filePath = new Path(baseDir, fileName);
+ LOG.info("Adding " + fileName + " to LocalResources for export.");
+ if (fileName.contains("giraph-core")) {
+ coreJarFound = true;
+ }
+ addFileToResourceMap(map, fs, filePath);
+ }
+ }
+ if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps
+ LOG.warn("Job jars (-yj option) didn't include giraph-core.");
+ }
+ Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ addFileToResourceMap(map, fs, confPath);
+ }
+
+ /**
+ * Utility function to locate local JAR files and other resources
+ * recursively in the dirs on the local CLASSPATH. Once all the files
+ * named in <code>fileNames</code> are found, we stop and return the results.
+ * @param fileNames the file name of the jars, without path information.
+ * @return a set of Paths to the jar files requested in fileNames.
+ */
+ public static Set<Path> getLocalFiles(final Set<String> fileNames) {
+ Set<Path> jarPaths = Sets.newHashSet();
+ String classPath = ".:" + System.getenv("HADOOP_HOME");
+ if (classPath.length() > 2) {
+ classPath += ":";
+ }
+ classPath += System.getenv("CLASSPATH");
+ for (String baseDir : classPath.split(":")) {
+ if (baseDir.length() > 0) {
+ // lose the globbing chars that will fail in File#listFiles
+ final int lastFileSep = baseDir.lastIndexOf("/");
+ if (lastFileSep > 0) {
+ String test = baseDir.substring(lastFileSep);
+ if (test.contains("*")) {
+ baseDir = baseDir.substring(0, lastFileSep);
+ }
+ }
+ populateJarList(new File(baseDir), jarPaths, fileNames);
+ }
+ if (jarPaths.size() >= fileNames.size()) {
+ break; // found a resource for each name in the input set, all done
+ }
+ }
+ return jarPaths;
+ }
+
+ /**
+ * Start in the working directory and recursively locate all jars.
+ * @param dir current directory to explore.
+ * @param fileSet the list to populate.
+ * @param fileNames file names to locate.
+ */
+ private static void populateJarList(final File dir,
+ final Set<Path> fileSet, final Set<String> fileNames) {
+ File[] filesInThisDir = dir.listFiles();
+ if (null == filesInThisDir) {
+ return;
+ }
+ for (File f : dir.listFiles()) {
+ if (f.isDirectory()) {
+ populateJarList(f, fileSet, fileNames);
+ } else if (f.isFile() && fileNames.contains(f.getName())) {
+ fileSet.add(new Path(f.getAbsolutePath()));
+ }
+ }
+ }
+
+ /**
+ * Boilerplate to add a file to the local resources..
+ * @param localResources the LocalResources map to populate.
+ * @param fs handle to the HDFS file system.
+ * @param target the file to send to the remote container.
+ */
+ public static void addFileToResourceMap(Map<String, LocalResource>
+ localResources, FileSystem fs, Path target)
+ throws IOException {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ FileStatus destStatus = fs.getFileStatus(target);
+ resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
+ resource.setSize(destStatus.getLen());
+ resource.setTimestamp(destStatus.getModificationTime());
+ resource.setType(LocalResourceType.FILE); // use FILE, even for jars!
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResources.put(target.getName(), resource);
+ LOG.info("Registered file in LocalResources: " + target.getName());
+ }
+
+ /**
+ * Get the base HDFS dir we will be storing our LocalResources in.
+ * @param fs the file system.
+ * @param appId the ApplicationId under which our resources will be stored.
+ * @return the path
+ */
+ public static Path getFsCachePath(final FileSystem fs,
+ final ApplicationId appId) {
+ return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
+ }
+
+ /**
+ * Popuate the environment string map to be added to the environment vars
+ * in a remote execution container. Adds the local classpath to pick up
+ * "yarn-site.xml" and "mapred-site.xml" stuff.
+ * @param env the map of env var values.
+ * @param giraphConf the GiraphConfiguration to pull values from.
+ */
+ public static void addLocalClasspathToEnv(final Map<String, String> env,
+ final GiraphConfiguration giraphConf) {
+ StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+ for (String cpEntry : giraphConf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':').append(cpEntry.trim());
+ }
+ for (String cpEntry : giraphConf.getStrings(
+ MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':').append(cpEntry.trim());
+ }
+ // add the runtime classpath needed for tests to work
+ if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(':').append(System.getenv("CLASSPATH"));
+ }
+ env.put("CLASSPATH", classPathEnv.toString());
+ }
+
+ /**
+ * Populate the LocalResources list with the GiraphConf XML file's HDFS path.
+ * @param giraphConf the GiraphConfifuration to export for worker tasks.
+ * @param appId the ApplicationId for this YARN app.
+ * @param localResourceMap the LocalResource map of files to export to tasks.
+ */
+ public static void addGiraphConfToLocalResourceMap(GiraphConfiguration
+ giraphConf, ApplicationId appId, Map<String, LocalResource>
+ localResourceMap) throws IOException {
+ FileSystem fs = FileSystem.get(giraphConf);
+ Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+ GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
+ }
+
+ /**
+ * Export our populated GiraphConfiguration as an XML file to be used by the
+ * ApplicationMaster's exec container, and register it with LocalResources.
+ * @param giraphConf the current Configuration object to be published.
+ * @param appId the ApplicationId to stamp this app's base HDFS resources dir.
+ */
+ public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
+ ApplicationId appId) throws IOException {
+ File confFile = new File(System.getProperty("java.io.tmpdir"),
+ GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ if (confFile.exists()) {
+ confFile.delete();
+ }
+ String localConfPath = confFile.getAbsolutePath();
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(localConfPath);
+ giraphConf.writeXml(fos);
+ FileSystem fs = FileSystem.get(giraphConf);
+ Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+ GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ fos.flush();
+ fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
+ } finally {
+ if (null != fos) {
+ fos.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
new file mode 100644
index 0000000..c8a3683
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Catch all package for YARN-specific code. Only compiles under Maven
+ * hadoop_yarn profile.
+ */
+package org.apache.giraph.yarn;
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
new file mode 100644
index 0000000..efd1179
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.junit.Test;
+
+
+/**
+ * Tests the Giraph on YARN workflow. Basically, the plan is to use a
+ * <code>MiniYARNCluster</code> to run a small test job through our
+ * GiraphYarnClient -> GiraphApplicationMaster -> GiraphYarnTask (2 no-ops)
+ * No "real" BSP code need be tested here, as it is not aware it is running on
+ * YARN once the job is in progress, so the existing MRv1 BSP tests are fine.
+ */
+public class TestYarnJob implements Watcher {
+ private static final Logger LOG = Logger.getLogger(TestYarnJob.class);
+ /**
+ * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
+ */
+ private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable,
+ NullWritable, IntWritable> {
+ @Override
+ public void compute(Iterable<IntWritable> messages) throws IOException {
+ voteToHalt();
+ }
+ }
+
+ /** job name for this integration test */
+ private static final String JOB_NAME = "giraph-TestPureYarnJob";
+ /** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */
+ private static final int LOCAL_ZOOKEEPER_PORT = 22183;
+ /** ZooKeeper list system property */
+ private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
+ /** Local ZK working dir, avoid InternalVertexRunner naming */
+ private static final String zkDirName = "_bspZooKeeperYarn";
+ /** Local ZK Manager working dir, avoid InternalVertexRunner naming */
+ private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn";
+
+ /** Temp ZK base working dir for integration test */
+ private File testBaseDir = null;
+ /** Fake input dir for integration test */
+ private File inputDir = null;
+ /** Fake output dir for integration test */
+ private File outputDir = null;
+ /** Temp ZK working dir for integration test */
+ private File zkDir = null;
+ /** Temp ZK Manager working dir for integration test */
+ private File zkMgrDir = null;
+ /** Internal ZooKeeper instance for integration test run */
+ private InternalZooKeeper zookeeper;
+ /** For running the ZK instance locally */
+ private ExecutorService exec = Executors.newSingleThreadExecutor();
+ /** GiraphConfiguration for a "fake YARN job" */
+ private GiraphConfiguration conf = null;
+ /** Counter for # of znode events during integration test */
+ private int zkEventCount = 0;
+ /** Our YARN test cluster for local integration test */
+ private MiniYARNCluster cluster = null;
+
+ @Test
+ public void testPureYarnJob() {
+ try {
+ setupYarnConfiguration();
+ initLocalZookeeper();
+ initYarnCluster();
+ GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
+ Assert.assertTrue(testGyc.run(true));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Caught exception in TestYarnJob: " + e);
+ } finally {
+ zookeeper.end();
+ exec.shutdown();
+ cluster.stop();
+ deleteTempDirectories();
+ }
+ }
+
+ /**
+ * Logging this stuff will help you debug integration test issues.
+ * @param zkEvent incoming event for our current test ZK's znode tree.
+ */
+ @Override
+ public void process(WatchedEvent zkEvent) {
+ String event = zkEvent == null ? "NULL" : zkEvent.toString();
+ LOG.info("TestYarnJob observed ZK event: " + event +
+ " for a total of " + (++zkEventCount) + " so far.");
+ }
+
+ /**
+ * Delete our temp dir so checkstyle and rat plugins are happy.
+ */
+ private void deleteTempDirectories() {
+ try {
+ if (testBaseDir != null && testBaseDir.exists()) {
+ FileUtils.deleteDirectory(testBaseDir);
+ }
+ } catch (IOException ioe) {
+ LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
+ }
+ }
+
+ /**
+ * Initialize a local ZK instance for our test run.
+ */
+ private void initLocalZookeeper() throws IOException {
+ zookeeper = new InternalZooKeeper();
+ exec.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Configure a local zookeeper instance
+ Properties zkProperties = generateLocalZkProperties();
+ QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+ qpConfig.parseProperties(zkProperties);
+ // run the zookeeper instance
+ final ServerConfig zkConfig = new ServerConfig();
+ zkConfig.readFrom(qpConfig);
+ zookeeper.runFromConfig(zkConfig);
+ } catch (QuorumPeerConfig.ConfigException qpcce) {
+ throw new RuntimeException("parse of generated ZK config file " +
+ "has failed.", qpcce);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
+ }
+ }
+
+ /**
+ * Returns pre-created ZK conf properties for Giraph integration test.
+ * @return the populated properties sheet.
+ */
+ Properties generateLocalZkProperties() {
+ Properties zkProperties = new Properties();
+ zkProperties.setProperty("tickTime", "2000");
+ zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+ zkProperties.setProperty("clientPort",
+ String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ zkProperties.setProperty("maxClientCnxns", "10000");
+ zkProperties.setProperty("minSessionTimeout", "10000");
+ zkProperties.setProperty("maxSessionTimeout", "100000");
+ zkProperties.setProperty("initLimit", "10");
+ zkProperties.setProperty("syncLimit", "5");
+ zkProperties.setProperty("snapCount", "50000");
+ return zkProperties;
+ }
+ });
+ }
+
+ /**
+ * Set up the GiraphConfiguration settings we need to run a no-op Giraph
+ * job on a MiniYARNCluster as an integration test. Some YARN-specific
+ * flags are set inside GiraphYarnClient and won't need to be set here.
+ */
+ private void setupYarnConfiguration() throws IOException {
+ conf = new GiraphConfiguration();
+ conf.setWorkerConfiguration(1, 1, 100.0f);
+ conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
+ conf.setEventWaitMsecs(3 * 1000);
+ conf.setYarnLibJars(""); // no need
+ conf.setYarnTaskHeapMb(256); // small since no work to be done
+ conf.setVertexClass(DummyYarnVertex.class);
+ conf.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ conf.setNumComputeThreads(1);
+ conf.setMaxTaskAttempts(1);
+ conf.setNumInputSplitsThreads(1);
+ // Giraph on YARN only ever things its running in "non-local" mode
+ conf.setLocalTestMode(false);
+ // this has to happen here before we populate the conf with the temp dirs
+ setupTempDirectories();
+ conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
+ GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath()));
+ // hand off the ZK info we just created to our no-op job
+ GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
+ conf.setZooKeeperConfiguration(zkList);
+ conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath());
+ // without this, our "real" client won't connect w/"fake" YARN cluster
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ }
+
+ /**
+ * Initialize the temp dir tree for ZK and I/O for no-op integration test.
+ */
+ private void setupTempDirectories() throws IOException {
+ try {
+ testBaseDir =
+ new File(System.getProperty("user.dir"), JOB_NAME);
+ if (testBaseDir.exists()) {
+ testBaseDir.delete();
+ }
+ testBaseDir.mkdir();
+ inputDir = new File(testBaseDir, "yarninput");
+ if (inputDir.exists()) {
+ inputDir.delete();
+ }
+ inputDir.mkdir();
+ File inFile = new File(inputDir, "graph_data.txt");
+ inFile.createNewFile();
+ outputDir = new File(testBaseDir, "yarnoutput");
+ if (outputDir.exists()) {
+ outputDir.delete();
+ } // don't actually produce the output dir, let Giraph On YARN do it
+ zkDir = new File(testBaseDir, zkDirName);
+ if (zkDir.exists()) {
+ zkDir.delete();
+ }
+ zkDir.mkdir();
+ zkMgrDir = new File(testBaseDir, zkMgrDirName);
+ if (zkMgrDir.exists()) {
+ zkMgrDir.delete();
+ }
+ zkMgrDir.mkdir();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ throw new IOException("from setupTempDirectories: ", ioe);
+ }
+ }
+
+ /**
+ * Initialize the MiniYARNCluster for the integration test.
+ */
+ private void initYarnCluster() {
+ cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
+ cluster.init(new ImmutableClassesGiraphConfiguration(conf));
+ cluster.start();
+ }
+
+ /**
+ * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+ */
+ class InternalZooKeeper extends ZooKeeperServerMain {
+ /**
+ * Shutdown the ZooKeeper instance.
+ */
+ void end() {
+ shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/resources/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/resources/capacity-scheduler.xml b/giraph-core/src/test/resources/capacity-scheduler.xml
new file mode 100644
index 0000000..5e19b9a
--- /dev/null
+++ b/giraph-core/src/test/resources/capacity-scheduler.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<configuration>
+
+ <property>
+ <name>yarn.scheduler.capacity.root.queues</name>
+ <value>unfunded,default</value>
+ </property>
+
+ <property>
+ <name>yarn.scheduler.capacity.root.capacity</name>
+ <value>100</value>
+ </property>
+
+ <property>
+ <name>yarn.scheduler.capacity.root.unfunded.capacity</name>
+ <value>50</value>
+ </property>
+
+ <property>
+ <name>yarn.scheduler.capacity.root.default.capacity</name>
+ <value>50</value>
+ </property>
+
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 3b6a08c..21e8ccf 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -104,6 +104,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -116,6 +128,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -128,6 +152,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
<configuration>
@@ -152,6 +188,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
<configuration>
@@ -180,6 +228,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -191,24 +251,119 @@ under the License.
</build>
</profile>
+ <!-- Currently supports hadoop-2.0.2-alpha
+ (see hadoop_yarn profile in giraph-parent POM to change) -->
+ <profile>
+ <id>hadoop_yarn</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>munge-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!-- The profiles below do not (yet) include any munge flags -->
<profile>
<id>hadoop_2.0.0</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_2.0.1</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_2.0.2</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
- <id>hadoop_2.0.3</id>
+ <id>hadoop_2.0.3</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_trunk</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 446aa2a..c883d41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,7 +496,7 @@ under the License.
<exclude>CODE_CONVENTIONS</exclude>
<!-- generated content -->
<exclude>**/target/**</exclude>
- <exclude>_bsp/**</exclude>
+ <exclude>/_bsp/**</exclude>
<exclude>.checkstyle</exclude>
<!-- source control and IDEs -->
<exclude>.reviewboardrc</exclude>
@@ -506,7 +506,9 @@ under the License.
<exclude>.idea/**</exclude>
<exclude>**/*.iml</exclude>
<exclude>**/*.ipr</exclude>
- </excludes>
+ <!-- test resources (for Giraph on YARN profile) -->
+ <exclude>**/test/resources/**</exclude>
+ </excludes>
</configuration>
</plugin>
<plugin>
@@ -749,6 +751,60 @@ under the License.
</dependencies>
</profile>
+ <!-- This profile runs on Hadoop-2.0.3-alpha by default, but does not
+ use Hadoop MapReduce v2 to set up the Giraph job. This means the Giraph
+ worker/master tasks are not Mappers. Tasks are run in YARN-managed execution
+ containers. Internally, the Giraph framework continues to depend on many Hadoop
+ MapReduce classes to perform work. -->
+ <profile>
+ <id>hadoop_yarn</id>
+ <properties>
+ <hadoop.version>2.0.3-alpha</hadoop.version>
+ <munge.symbols>PURE_YARN</munge.symbols>
+ </properties>
+ <dependencies>
+ <!-- sorted lexicographically -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+ </profile>
+
<!-- Help keep future Hadoop versions munge-free:
All profiles below are munge-free: avoid introducing any munge
flags on any of the following profiles. -->
@@ -917,7 +973,7 @@ under the License.
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
- <version>1.3.2</version>
+ <version>2.1</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
[2/2] git commit: updated refs/heads/trunk to b2dff27
Posted by er...@apache.org.
GIRAPH-13: Port Giraph to YARN
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b2dff275
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b2dff275
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b2dff275
Branch: refs/heads/trunk
Commit: b2dff2751d8d3d768f788b39089688c18f6c1750
Parents: 67f5f74
Author: Eli Reisman <er...@apache.org>
Authored: Wed Apr 3 00:15:56 2013 -0400
Committer: Eli Reisman <er...@apache.org>
Committed: Wed Apr 3 00:15:56 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
checkstyle.xml | 5 +-
giraph-core/pom.xml | 159 ++++-
.../main/java/org/apache/giraph/GiraphRunner.java | 15 +-
.../java/org/apache/giraph/bsp/BspInputFormat.java | 5 +-
.../apache/giraph/conf/GiraphConfiguration.java | 65 ++
.../org/apache/giraph/conf/GiraphConstants.java | 17 +-
.../org/apache/giraph/graph/GraphTaskManager.java | 36 +-
.../org/apache/giraph/master/BspServiceMaster.java | 24 +-
.../apache/giraph/utils/ConfigurationUtils.java | 42 +-
.../org/apache/giraph/worker/BspServiceWorker.java | 32 +-
.../giraph/yarn/GiraphApplicationMaster.java | 699 +++++++++++++++
.../org/apache/giraph/yarn/GiraphYarnClient.java | 476 ++++++++++
.../org/apache/giraph/yarn/GiraphYarnTask.java | 240 +++++
.../java/org/apache/giraph/yarn/YarnUtils.java | 241 +++++
.../java/org/apache/giraph/yarn/package-info.java | 22 +
.../java/org/apache/giraph/yarn/TestYarnJob.java | 285 ++++++
.../src/test/resources/capacity-scheduler.xml | 26 +
giraph-examples/pom.xml | 157 ++++-
pom.xml | 62 ++-
20 files changed, 2578 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e0a82a6..69261d2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-13: Port Giraph to YARN (ereisman)
+
GIRAPH-600: Create an option to do output during computation (majakabiljo)
GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle.xml b/checkstyle.xml
index 370c120..66fd1ad 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -104,7 +104,10 @@
<!-- Switch statements should be complete and with independent cases -->
<module name="FallThrough" />
<module name="MissingSwitchDefault" />
- <module name="RedundantThrows"/>
+ <!-- For hadoop_yarn profile, some YARN exceptions aren't loading in checkstyle -->
+ <module name="RedundantThrows">
+ <property name="suppressLoadErrors" value="true" />
+ </module>
<module name="SimplifyBooleanExpression"/>
<module name="SimplifyBooleanReturn"/>
<!-- Only one statment per line allowed -->
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 3580d0c..2f473ed 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -136,6 +136,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -148,6 +160,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -160,6 +184,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
<configuration>
@@ -184,6 +220,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
<configuration>
@@ -212,6 +260,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>munge-maven-plugin</artifactId>
</plugin>
@@ -223,24 +283,121 @@ under the License.
</build>
</profile>
+ <!-- Currently supports hadoop-2.0.3-alpha
+ (see hadoop_yarn profile in giraph-parent POM to change) -->
+ <profile>
+ <id>hadoop_yarn</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>munge-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!-- Unmunged profiles are below. -->
+
<profile>
<id>hadoop_2.0.0</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_2.0.1</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</profile>
<profile>
- <id>hadoop_2.0.2</id>
+ <id>hadoop_2.0.2</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_2.0.3</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop_trunk</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/yarn/**</exclude>
+ </excludes>
+ <testExcludes>
+ <exclude>**/yarn/**</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 5bd5686..1bd79b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -21,6 +21,9 @@ import org.apache.commons.cli.CommandLine;
import org.apache.giraph.utils.ConfigurationUtils;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.GiraphJob;
+/*if[PURE_YARN]
+import org.apache.giraph.yarn.GiraphYarnClient;
+end[PURE_YARN]*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
@@ -64,16 +67,26 @@ public class GiraphRunner implements Tool {
* @return job run exit code
*/
public int run(String[] args) throws Exception {
+ if (null == getConf()) { // for YARN profile
+ conf = new Configuration();
+ }
GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
CommandLine cmd = ConfigurationUtils.parseArgs(giraphConf, args);
if (null == cmd) {
return 0; // user requested help/info printout, don't run a job.
}
+ // set up job for various platforms
final String vertexClassName = args[0];
- GiraphJob job = new GiraphJob(giraphConf, "Giraph: " + vertexClassName);
+ final String jobName = "Giraph: " + vertexClassName;
+ /*if[PURE_YARN]
+ GiraphYarnClient job = new GiraphYarnClient(giraphConf, jobName);
+ else[PURE_YARN]*/
+ GiraphJob job = new GiraphJob(giraphConf, jobName);
prepareHadoopMRJob(job, cmd);
+ /*end[PURE_YARN]*/
+ // run the job, collect results
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to run Vertex: " + vertexClassName);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index cc53271..8f88c80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -52,13 +52,16 @@ public class BspInputFormat extends InputFormat<Text, Text> {
int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
int maxTasks = maxWorkers;
- if (splitMasterWorker) {
+ // if this is a YARN job, separate ZK should already be running
+ boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
+ if (splitMasterWorker && !isYarnJob) {
int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
maxTasks += zkServers;
}
if (LOG.isDebugEnabled()) {
LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
", split master/worker = " + splitMasterWorker +
+ ", is YARN-only job = " + isYarnJob +
", total max tasks = " + maxTasks);
}
return maxTasks;
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 040c26f..8a78313 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -434,6 +434,10 @@ public class GiraphConfiguration extends Configuration
set(ZOOKEEPER_LIST, serverList);
}
+ /**
+ * Getter for SPLIT_MASTER_WORKER flag.
+ * @return boolean flag value.
+ */
public final boolean getSplitMasterWorker() {
return SPLIT_MASTER_WORKER.get(this);
}
@@ -475,6 +479,50 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
+ * actually managing our cluster nodes, i.e. each task is a Mapper.
+ * @return TRUE if this is a pure YARN job.
+ */
+ public boolean isPureYarnJob() {
+ return IS_PURE_YARN_JOB.get(this);
+ }
+
+ /**
+ * Jars required in "Pure YARN" jobs (names only, no paths) should
+ * be listed here in full, including Giraph framework jar(s).
+ * @return the comma-separated list of jar names for export to cluster.
+ */
+ public String getYarnLibJars() {
+ return GIRAPH_YARN_LIBJARS.get(this);
+ }
+
+ /**
+ * Populate jar list for Pure YARN jobs.
+ * @param jarList a comma-separated list of jar names
+ */
+ public void setYarnLibJars(String jarList) {
+ GIRAPH_YARN_LIBJARS.set(this, jarList);
+ }
+
+ /**
+ * Get heap size (in MB) for each task in our Giraph job run,
+ * assuming this job will run on the "pure YARN" profile.
+ * @return the heap size for all tasks, in MB
+ */
+ public int getYarnTaskHeapMb() {
+ return GIRAPH_YARN_TASK_HEAP_MB.get(this);
+ }
+
+ /**
+ * Set heap size for Giraph tasks in our job run, assuming
+ * the job will run on the "pure YARN" profile.
+ * @param heapMb the heap size for all tasks
+ */
+ public void setYarnTaskHeapMb(int heapMb) {
+ GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
+ }
+
+ /**
* Get the ZooKeeper list.
*
* @return ZooKeeper list of strings, comma separated or null if none set.
@@ -496,10 +544,27 @@ public class GiraphConfiguration extends Configuration
return LOG_THREAD_LAYOUT.get(this);
}
+ /**
+ * is this job run a local test?
+ * @return the test status as recorded in the Configuration
+ */
public boolean getLocalTestMode() {
return LOCAL_TEST_MODE.get(this);
}
+ /**
+ * Flag this job as a local test run.
+ * @param flag the test status for this job
+ */
+ public void setLocalTestMode(boolean flag) {
+ LOCAL_TEST_MODE.set(this, flag);
+ }
+
+ /**
+ * The number of server tasks in our ZK quorum for
+ * this job run.
+ * @return the number of ZK servers in the quorum
+ */
public int getZooKeeperServerCount() {
return ZOOKEEPER_SERVER_COUNT.get(this);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index eaa8363..730fa5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -141,8 +141,21 @@ public interface GiraphConstants {
BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
- /** Output Format Path (for Giraph-on-YARN) */
- String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
+ /** conf key for comma-separated list of jars to export to YARN workers */
+ StrConfOption GIRAPH_YARN_LIBJARS =
+ new StrConfOption("giraph.yarn.libjars", "");
+ /** Name of the XML file that will export our Configuration to YARN workers */
+ String GIRAPH_YARN_CONF_FILE = "giraph-conf.xml";
+ /** Giraph default heap size for all tasks when running on YARN profile */
+ int GIRAPH_YARN_TASK_HEAP_MB_DEFAULT = 1024;
+ /** Name of Giraph property for user-configurable heap memory per worker */
+ IntConfOption GIRAPH_YARN_TASK_HEAP_MB = new IntConfOption(
+ "giraph.yarn.task.heap.mb", GIRAPH_YARN_TASK_HEAP_MB_DEFAULT);
+ /** Default priority level in YARN for our task containers */
+ int GIRAPH_YARN_PRIORITY = 10;
+ /** Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks) */
+ BooleanConfOption IS_PURE_YARN_JOB =
+ new BooleanConfOption("giraph.pure.yarn.job", false);
/** Vertex index class */
ClassConfOption<WritableComparable> VERTEX_ID_CLASS =
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3ae5ed3..8ed44e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -94,10 +94,11 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
public class GraphTaskManager<I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> implements
ResetSuperstepMetricsObserver {
- static {
+ /*if_not[PURE_YARN]
+ static { // Eliminate this? Even MRv1 tasks should not need it here.
Configuration.addDefaultResource("giraph-site.xml");
}
-
+ end[PURE_YARN]*/
/** Name of metric for superstep time in msec */
public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
/** Name of metric for compute on all vertices in msec */
@@ -157,6 +158,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
private String serverPortList;
/** The Hadoop Mapper#Context for this job */
private Mapper<?, ?, ?, ?>.Context context;
+ /** is this GraphTaskManager the master? */
+ private boolean isMaster;
/**
* Default constructor for GiraphTaskManager.
@@ -165,6 +168,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
*/
public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
+ this.isMaster = false;
}
/**
@@ -174,9 +178,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
public void setup(Path[] zkPathList)
throws IOException, InterruptedException {
context.setStatus("setup: Beginning worker setup.");
- determineClassTypes(context.getConfiguration());
conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
context.getConfiguration());
+ determineClassTypes(conf);
// configure global logging level for Giraph job
initializeAndConfigureLogging();
// init the metrics objects
@@ -302,6 +306,23 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
/**
+ * Sets the "isMaster" flag for final output commit to happen on master.
+ * @param im the boolean input to set isMaster. Applies to "pure YARN only"
+ */
+ public void setIsMaster(final boolean im) {
+ this.isMaster = im;
+ }
+
+ /**
+ * Get "isMaster" status flag -- we need to know if we're the master in the
+ * "finally" block of our GiraphYarnTask#execute() to commit final job output.
+ * @return true if this task IS the master.
+ */
+ public boolean isMaster() {
+ return isMaster;
+ }
+
+ /**
* Produce a reference to the "start" superstep timer for the current
* superstep.
* @param superstep the current superstep count
@@ -455,7 +476,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Copied from JobConf to get the location of this jar. Workaround for
- * things like Oozie map-reduce jobs.
+ * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
+ * make use of this, as the jars are unpacked at each container site.
*
* @param myClass Class to search the class loader path for to locate
* the relevant jar file
@@ -574,7 +596,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
private void locateZookeeperClasspath(Path[] fileClassPaths)
throws IOException {
if (!conf.getLocalTestMode()) {
- //Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
String zkClasspath = null;
if (fileClassPaths == null) {
if (LOG.isInfoEnabled()) {
@@ -584,7 +605,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (jarFile == null) {
jarFile = findContainingJar(getClass());
}
- zkClasspath = jarFile.replaceFirst("file:", "");
+ // Pure YARN profiles will use unpacked resources, so calls
+ // to "findContainingJar()" in that context can return NULL!
+ zkClasspath = null == jarFile ?
+ "./*" : jarFile.replaceFirst("file:", "");
} else {
StringBuilder sb = new StringBuilder();
sb.append(fileClassPaths[0]);
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 404e47e..9f4bcbf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -344,15 +344,21 @@ public class BspServiceMaster<I extends WritableComparable,
LOG.fatal("failJob: Killing job " + getJobId());
LOG.fatal("failJob: exception " + e.toString());
try {
- @SuppressWarnings("deprecation")
- org.apache.hadoop.mapred.JobClient jobClient =
+ if (getConfiguration().isPureYarnJob()) {
+ throw new RuntimeException(
+ "BspServiceMaster (YARN profile) is " +
+ "FAILING this task, throwing exception to end job run.", e);
+ } else {
+ @SuppressWarnings("deprecation")
+ org.apache.hadoop.mapred.JobClient jobClient =
new org.apache.hadoop.mapred.JobClient(
- (org.apache.hadoop.mapred.JobConf)
- getContext().getConfiguration());
- @SuppressWarnings("deprecation")
- JobID jobId = JobID.forName(getJobId());
- RunningJob job = jobClient.getJob(jobId);
- job.killJob();
+ (org.apache.hadoop.mapred.JobConf)
+ getContext().getConfiguration());
+ @SuppressWarnings("deprecation")
+ JobID jobId = JobID.forName(getJobId());
+ RunningJob job = jobClient.getJob(jobId);
+ job.killJob();
+ }
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} finally {
@@ -1737,6 +1743,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
if (isMaster) {
+ getGraphTaskManager().setIsMaster(true);
cleanUpZooKeeper();
// If desired, cleanup the checkpoint directory
if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
@@ -1750,7 +1757,6 @@ public class BspServiceMaster<I extends WritableComparable,
}
}
aggregatorHandler.close();
-
masterClient.closeConnections();
masterServer.close();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index bd30455..9ebe693 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,9 @@
*/
package org.apache.giraph.utils;
+/*if[PURE_YARN]
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+end[PURE_YARN]*/
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.commons.cli.BasicParser;
@@ -41,6 +44,8 @@ import org.apache.giraph.partition.Partition;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -57,16 +62,33 @@ public final class ConfigurationUtils {
private static final Logger LOG =
Logger.getLogger(ConfigurationUtils.class);
/** The base path for output dirs as saved in GiraphConfiguration */
- private static final Path BASE_OUTPUT_DIR =
- new Path("hdfs://user/" + System.getenv("USER"));
+ private static final Path BASE_OUTPUT_PATH;
+ static {
+ // whether local or remote, if there's no *-site.xml's to find, we're done
+ try {
+ BASE_OUTPUT_PATH = FileSystem.get(new Configuration()).getHomeDirectory();
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Error locating default base path!", ioe);
+ }
+ }
/** Maintains our accepted options in case the caller wants to add some */
private static Options OPTIONS;
+ /*if_not[PURE_YARN]
+ private static String OUTDIR = ""; // no-op placeholder for YARN
+ end[PURE_YARN]*/
+
static {
OPTIONS = new Options();
OPTIONS.addOption("h", "help", false, "Help");
OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
"algorithms");
OPTIONS.addOption("q", "quiet", false, "Quiet output");
+ OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
+ "filenames to distribute to Giraph tasks and ApplicationMaster. " +
+ "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
+ OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
+ "Giraph task (YARN only.) Defaults to " +
+ GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
OPTIONS.addOption("w", "workers", true, "Number of workers");
OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
@@ -304,11 +326,20 @@ public final class ConfigurationUtils {
}
}
}
+ // YARN-ONLY OPTIONS
+ if (cmd.hasOption("yj")) {
+ giraphConfiguration.setYarnLibJars(cmd.getOptionValue("yj"));
+ }
+ if (cmd.hasOption("yh")) {
+ giraphConfiguration.setYarnTaskHeapMb(
+ Integer.parseInt(cmd.getOptionValue("yh")));
+ }
if (cmd.hasOption("of")) {
if (cmd.hasOption("op")) {
- Path outputDir = new Path(BASE_OUTPUT_DIR, cmd.getOptionValue("op"));
- giraphConfiguration.set(
- GiraphConstants.GIRAPH_OUTPUT_DIR, outputDir.toString());
+ Path outputDir = new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
+ outputDir = // for YARN conf to get the out dir we need w/o a Job obj
+ outputDir.getFileSystem(giraphConfiguration).makeQualified(outputDir);
+ giraphConfiguration.set(OUTDIR, outputDir.toString());
} else {
if (LOG.isInfoEnabled()) {
LOG.info("No output path specified. Ensure your OutputFormat " +
@@ -316,6 +347,7 @@ public final class ConfigurationUtils {
}
}
}
+ // END YARN-ONLY OPTIONS
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 35db999..2ea91b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
@@ -978,9 +979,30 @@ else[HADOOP_NON_SECURE]*/
getContext().progress();
++partitionIndex;
}
- vertexWriter.close(getContext());
+ vertexWriter.close(getContext()); // the temp results are saved now
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "saveVertices: Done saving vertices");
+ "saveVertices: Done saving vertices.");
+ // YARN: must complete the commit the "task" output, Hadoop isn't there.
+ if (getConfiguration().isPureYarnJob() &&
+ getConfiguration().getVertexOutputFormatClass() != null) {
+ try {
+ OutputCommitter outputCommitter =
+ vertexOutputFormat.getOutputCommitter(getContext());
+ if (outputCommitter.needsTaskCommit(getContext())) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "OutputCommitter: committing task output.");
+ // transfer from temp dirs to "task commit" dirs to prep for
+ // the master's OutputCommitter#commitJob(context) call to finish.
+ outputCommitter.commitTask(getContext());
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while attempting to obtain " +
+ "OutputCommitter.", ie);
+ } catch (IOException ioe) {
+ LOG.error("Master task's attempt to commit output has " +
+ "FAILED.", ioe);
+ }
+ }
}
@Override
@@ -1403,6 +1425,12 @@ else[HADOOP_NON_SECURE]*/
"to see if it needs to restart");
}
JSONObject jsonObj = getJobState();
+ // in YARN, we have to manually commit our own output in 2 stages that we
+ // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
+ if (getConfiguration().isPureYarnJob() && null == jsonObj) {
+ LOG.error("BspServiceWorker#getJobState() came back NULL.");
+ return false; // the event has been processed.
+ }
try {
if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
ApplicationState.START_SUPERSTEP) &&
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
new file mode 100644
index 0000000..c2b88a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
+import java.security.PrivilegedAction;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The YARN Application Master for Giraph is launched when the GiraphYarnClient
+ * successfully requests an execution container from the Resource Manager. The
+ * Application Master is provided by Giraph to manage all requests for resources
+ * (worker nodes, memory, jar files, job configuration metadata, etc.) that
+ * Giraph will need to perform the job. When Giraph runs in a non-YARN context,
+ * the role of the Application Master is played by Hadoop when it launches our
+ * GraphMappers (worker/master task nodes) to run the job.
+ */
+public class GiraphApplicationMaster {
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(GiraphApplicationMaster.class);
+ /** Exit code for YARN containers that were manually killed/aborted */
+ private static final int YARN_ABORT_EXIT_STATUS = -100;
+ /** Exit code for successfully run YARN containers */
+ private static final int YARN_SUCCESS_EXIT_STATUS = 0;
+ /** millis to sleep between heartbeats during long loops */
+ private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
+ /** A reusable map of resources already in HDFS for each task to copy-to-local
+ * env and use to launch each GiraphYarnTask. */
+ private static Map<String, LocalResource> LOCAL_RESOURCES;
+ /** Initialize the Configuration class with the resource file exported by
+ * the YarnClient. We will need to export this resource to the tasks also.
+ * Construct the HEARTBEAT to use to ping the RM about job progress/health.
+ */
+ static {
+ // pick up new conf XML file and populate it with stuff exported from client
+ Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+ }
+
+ /** Handle to AppMaster's RPC connection to YARN and the RM. */
+ private final AMRMProtocol resourceManager;
+ /** bootstrap handle to YARN RPC service */
+ private final YarnRPC rpc;
+ /** GiraphApplicationMaster's application attempt id */
+ private final ApplicationAttemptId appAttemptId;
+ /** GiraphApplicationMaster container id. Leave me here, I'm very useful */
+ private final ContainerId containerId;
+ /** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */
+ private final int containersToLaunch;
+ /** MB of JVM heap per Giraph task container */
+ private final int heapPerContainer;
+ /** Giraph configuration for this job, transported here by YARN framework */
+ private final ImmutableClassesGiraphConfiguration giraphConf;
+ /** Completed Containers Counter */
+ private final AtomicInteger completedCount;
+ /** Failed Containers Counter */
+ private final AtomicInteger failedCount;
+ /** Number of containers requested (hopefully '-w' from our conf) */
+ private final AtomicInteger allocatedCount;
+ /** Number of successfully completed containers in this job run. */
+ private final AtomicInteger successfulCount;
+ /** the ACK #'s for AllocateRequests + heartbeats == last response # */
+ private AtomicInteger lastResponseId;
+ /** Executor to attempt asynchronous launches of Giraph containers */
+ private ExecutorService executor;
+ /** YARN progress is a <code>float</code> between 0.0f and 1.0f */
+ private float progress;
+ /** An empty resource request with which to send heartbeats + progress */
+ private AllocateRequest heartbeat;
+
+ /**
+ * Construct the GiraphAppMaster, populate fields using env vars
+ * set up by YARN framework in this execution container.
+ * @param cId the ContainerId
+ * @param aId the ApplicationAttemptId
+ */
+ protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
+ throws IOException {
+ containerId = cId; // future good stuff will need me to operate.
+ appAttemptId = aId;
+ progress = 0.0f;
+ lastResponseId = new AtomicInteger(0);
+ giraphConf =
+ new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
+ completedCount = new AtomicInteger(0);
+ failedCount = new AtomicInteger(0);
+ allocatedCount = new AtomicInteger(0);
+ successfulCount = new AtomicInteger(0);
+ rpc = YarnRPC.create(giraphConf);
+ resourceManager = getHandleToRm();
+ containersToLaunch = giraphConf.getMaxWorkers() + 1;
+ executor = Executors.newFixedThreadPool(containersToLaunch);
+ heapPerContainer = giraphConf.getYarnTaskHeapMb();
+ }
+
+ /**
+ * Coordinates all requests for Giraph's worker/master task containers, and
+ * manages application liveness heartbeat, completion status, teardown, etc.
+ */
+ private void run() {
+ // register Application Master with the YARN Resource Manager so we can
+ // begin requesting resources. The response contains useful cluster info.
+ try {
+ resourceManager.registerApplicationMaster(getRegisterAppMasterRequest());
+ } catch (IOException ioe) {
+ throw new IllegalStateException(
+ "GiraphApplicationMaster failed to register with RM.", ioe);
+ }
+
+ try {
+ // make the request only ONCE; only request more on container failure etc.
+ AMResponse amResponse = sendAllocationRequest();
+ logClusterResources(amResponse);
+ // loop here, waiting for TOTAL # REQUESTED containers to be available
+ // and launch them piecemeal they are reported to us in heartbeat pings.
+ launchContainersAsynchronously(amResponse);
+ // wait for the containers to finish & tally success/fails
+ awaitJobCompletion(); // all launched tasks are done before complete call
+ } finally {
+ // if we get here w/o problems, the executor is already long finished.
+ if (null != executor && !executor.isTerminated()) {
+ executor.shutdownNow(); // force kill, especially if got here by throw
+ }
+ // When the application completes, it should send a "finish request" to RM
+ try {
+ resourceManager.finishApplicationMaster(buildFinishAppMasterRequest());
+ } catch (YarnRemoteException yre) {
+ LOG.error("GiraphApplicationMaster failed to un-register with RM", yre);
+ }
+ if (null != rpc) {
+ rpc.stopProxy(resourceManager, giraphConf);
+ }
+ }
+ }
+
+ /**
+ * Reports the cluster resources in the AM response to our initial ask.
+ * @param amResponse the AM response from YARN.
+ */
+ private void logClusterResources(final AMResponse amResponse) {
+ // Check what the current available resources in the cluster are
+ Resource availableResources = amResponse.getAvailableResources();
+ LOG.info("Initial Giraph resource request for " + containersToLaunch +
+ " containers has been submitted. " +
+ "The RM reports cluster headroom is: " + availableResources);
+ }
+
+ /**
+ * Utility to build the final "job run is finished" request to the RM.
+ * @return the finish app master request, to send to the RM.
+ */
+ private FinishApplicationMasterRequest buildFinishAppMasterRequest() {
+ LOG.info("Application completed. Signalling finish to RM");
+ FinishApplicationMasterRequest finishRequest =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishRequest.setAppAttemptId(appAttemptId);
+ FinalApplicationStatus appStatus;
+ String appMessage = "Container Diagnostics: " +
+ " allocated=" + allocatedCount.get() +
+ ", completed=" + completedCount.get() +
+ ", succeeded=" + successfulCount.get() +
+ ", failed=" + failedCount.get();
+ if (successfulCount.get() == containersToLaunch) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ }
+ finishRequest.setDiagnostics(appMessage);
+ finishRequest.setFinishApplicationStatus(appStatus);
+ return finishRequest;
+ }
+
+ /**
+ * Loop and check the status of the containers until all are finished,
+ * logging how each container meets its end: success, error, or abort.
+ */
+ private void awaitJobCompletion() {
+ List<ContainerStatus> completedContainers;
+ do {
+ try {
+ Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+ } catch (InterruptedException ignored) {
+ final int notFinished = containersToLaunch - completedCount.get();
+ LOG.info("GiraphApplicationMaster interrupted from sleep while " +
+ " waiting for " + notFinished + "containers to finish job.");
+ }
+ updateProgress();
+ completedContainers =
+ sendHeartbeat().getAMResponse().getCompletedContainersStatuses();
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID= " +
+ containerStatus.getContainerId() +
+ ", state=" + containerStatus.getState() +
+ ", exitStatus=" + containerStatus.getExitStatus() +
+ ", diagnostics=" + containerStatus.getDiagnostics());
+ switch (containerStatus.getExitStatus()) {
+ case YARN_SUCCESS_EXIT_STATUS:
+ successfulCount.incrementAndGet();
+ break;
+ case YARN_ABORT_EXIT_STATUS:
+ break; // not success or fail
+ default:
+ failedCount.incrementAndGet();
+ break;
+ }
+ completedCount.incrementAndGet();
+ } // end completion check loop
+ } while (completedCount.get() < containersToLaunch);
+ }
+
+ /** Update the progress value for our next heartbeat (allocate request) */
+ private void updateProgress() {
+ // set progress to "half done + ratio of completed containers so far"
+ final float ratio = completedCount.get() / (float) containersToLaunch;
+ progress = 0.5f + ratio / 2.0f;
+ }
+
+ /**
+ * Loop while checking container request status, adding each new bundle of
+ * containers allocated to our executor to launch (run Giraph BSP task) the
+ * job on each. Giraph's full resource request was sent ONCE, but these
+ * containers will become available in groups, over a period of time.
+ * @param amResponse metadata about our AllocateRequest's results.
+ */
+ private void launchContainersAsynchronously(AMResponse amResponse) {
+ List<Container> allocatedContainers;
+ do {
+ // get fresh report on # alloc'd containers, sleep between checks
+ if (null == amResponse) {
+ amResponse = sendHeartbeat().getAMResponse();
+ }
+ allocatedContainers = amResponse.getAllocatedContainers();
+ allocatedCount.addAndGet(allocatedContainers.size());
+ LOG.info("Waiting for task containers: " + allocatedCount.get() +
+ " allocated out of " + containersToLaunch + " required.");
+ startContainerLaunchingThreads(allocatedContainers);
+ amResponse = null;
+ try {
+ Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
+ } catch (InterruptedException ignored) {
+ LOG.info("launchContainerAsynchronously() raised InterruptedException");
+ }
+ } while (containersToLaunch > allocatedCount.get());
+ }
+
+ /**
+ * For each container successfully allocated, attempt to set up and launch
+ * a Giraph worker/master task.
+ * @param allocatedContainers the containers we have currently allocated.
+ */
+ private void startContainerLaunchingThreads(final List<Container>
+ allocatedContainers) {
+ progress = allocatedCount.get() / (2.0f * containersToLaunch);
+ int placeholder = 0;
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container." +
+ ", containerId=" + allocatedContainer.getId() +
+ ", containerNode=" + allocatedContainer.getNodeId().getHost() +
+ ":" + allocatedContainer.getNodeId().getPort() +
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
+ ", containerState=" + allocatedContainer.getState() +
+ ", containerResourceMemory=" +
+ allocatedContainer.getResource().getMemory());
+ // Launch and start the container on a separate thread to keep the main
+ // thread unblocked as all containers may not be allocated at one go.
+ LaunchContainerRunnable launchThread =
+ new LaunchContainerRunnable(allocatedContainer, heapPerContainer);
+ executor.execute(launchThread);
+ }
+ }
+
+ /**
+ * Sends heartbeat messages that include progress amounts. These are in the
+ * form of a YARN AllocateRequest object that asks for 0 resources.
+ * @return the AllocateResponse, which we may or may not need.
+ */
+ private AllocateResponse sendHeartbeat() {
+ heartbeat.setProgress(progress);
+ heartbeat.setResponseId(lastResponseId.incrementAndGet());
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = resourceManager.allocate(heartbeat);
+ final int responseId = allocateResponse.getAMResponse().getResponseId();
+ if (responseId != lastResponseId.get()) {
+ lastResponseId.set(responseId);
+ }
+ checkForRebootFlag(allocateResponse.getAMResponse());
+ return allocateResponse;
+ } catch (YarnRemoteException yre) {
+ throw new IllegalStateException("sendHeartbeat() failed with " +
+ "YarnRemoteException: ", yre);
+ }
+ }
+
+ /**
+ * Compose and send the allocation request for our Giraph BSP worker/master
+ * compute nodes. Right now the requested containers are identical, mirroring
+ * Giraph's behavior when running on Hadoop MRv1. Giraph could use YARN
+ * to set fine-grained capability to each container, including host choice.
+ * @return The AM resource descriptor with our container allocations.
+ */
+ private AMResponse sendAllocationRequest() {
+ AllocateRequest allocRequest = Records.newRecord(AllocateRequest.class);
+ try {
+ List<ResourceRequest> containerList = buildResourceRequests();
+ allocRequest.addAllAsks(containerList);
+ List<ContainerId> releasedContainers = Lists.newArrayListWithCapacity(0);
+ allocRequest.setResponseId(lastResponseId.get());
+ allocRequest.setApplicationAttemptId(appAttemptId);
+ allocRequest.addAllReleases(releasedContainers);
+ allocRequest.setProgress(progress);
+ AllocateResponse allocResponse = resourceManager.allocate(allocRequest);
+ AMResponse amResponse = allocResponse.getAMResponse();
+ if (amResponse.getResponseId() != lastResponseId.get()) {
+ lastResponseId.set(amResponse.getResponseId());
+ }
+ checkForRebootFlag(amResponse);
+ // now, make THIS our new HEARTBEAT object, but with ZERO new requests!
+ initHeartbeatRequestObject(allocRequest);
+ return amResponse;
+ } catch (YarnRemoteException yre) {
+ throw new IllegalStateException("Giraph Application Master could not " +
+ "successfully allocate the specified containers from the RM.", yre);
+ }
+ }
+
+ /**
+ * If the YARN RM gets way out of sync with our App Master, its time to
+ * fail the job/restart. This should trigger the job end and cleanup.
+ * @param amResponse RPC response from YARN RM to check for reboot flag.
+ */
+ private void checkForRebootFlag(AMResponse amResponse) {
+ if (amResponse.getReboot()) {
+ LOG.error("AMResponse: " + amResponse + " raised YARN REBOOT FLAG!");
+ throw new RuntimeException("AMResponse " + amResponse +
+ " signaled GiraphApplicationMaster with REBOOT FLAG. Failing job.");
+ }
+ }
+
+
+ /**
+ * Reuses the initial container request (switched to "0 asks" so no new allocs
+ * occur) and sends all heartbeats using that request object.
+ * @param allocRequest the allocation request object to use as heartbeat.
+ */
+ private void initHeartbeatRequestObject(AllocateRequest allocRequest) {
+ allocRequest.clearAsks();
+ allocRequest.addAllAsks(Lists.<ResourceRequest>newArrayListWithCapacity(0));
+ heartbeat = allocRequest;
+ }
+
+ /**
+ * Utility to construct the ResourceRequest for our resource ask: all the
+ * Giraph containers we need, and their memory/priority requirements.
+ * @return a list of ResourceRequests to send (just one, for Giraph tasks)
+ */
+ private List<ResourceRequest> buildResourceRequests() {
+ // set up resource request for our Giraph BSP application
+ ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+ resourceRequest.setHostName("*"); // hand pick our worker locality someday
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
+ resourceRequest.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setVirtualCores(1); // new YARN API, won't work version < 2.0.3
+ capability.setMemory(heapPerContainer);
+ resourceRequest.setCapability(capability);
+ resourceRequest.setNumContainers(containersToLaunch);
+ return ImmutableList.of(resourceRequest);
+ }
+
+ /**
+ * Obtain handle to RPC connection to Resource Manager.
+ * @return the AMRMProtocol handle to YARN RPC.
+ */
+ private AMRMProtocol getHandleToRm() {
+ YarnConfiguration yarnConf = new YarnConfiguration(giraphConf);
+ final InetSocketAddress rmAddress = yarnConf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Could not obtain UGI for user.", ioe);
+ }
+ String tokenURLEncodedStr = System.getenv(
+ ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Could not decode token from URL", ioe);
+ }
+ SecurityUtil.setTokenService(token, rmAddress);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterToken is: " + token);
+ }
+ currentUser.addToken(token);
+ return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+ rmAddress, giraphConf);
+ }
+ });
+ } else { // non-secure
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+ rmAddress, yarnConf);
+ }
+ }
+
+ /**
+ * Get the request to register this Application Master with the RM.
+ * @return the populated AM request.
+ */
+ private RegisterApplicationMasterRequest getRegisterAppMasterRequest() {
+ RegisterApplicationMasterRequest appMasterRequest =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ appMasterRequest.setApplicationAttemptId(appAttemptId);
+ try {
+ appMasterRequest.setHost(InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException uhe) {
+ throw new IllegalStateException(
+ "Cannot resolve GiraphApplicationMaster's local hostname.", uhe);
+ }
+ // useful for a Giraph WebUI or whatever: play with these
+ // appMasterRequest.setRpcPort(appMasterRpcPort);
+ // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+ return appMasterRequest;
+ }
+
+ /**
+ * Lazily compose the map of jar and file names to LocalResource records for
+ * inclusion in GiraphYarnTask container requests. Can re-use the same map
+ * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
+ * @return the resource map for a ContainerLaunchContext
+ */
+ private Map<String, LocalResource> getTaskResourceMap() {
+ // Set the local resources: just send the copies already in HDFS
+ if (null == LOCAL_RESOURCES) {
+ LOCAL_RESOURCES = Maps.newHashMap();
+ try {
+ // if you have to update the giraphConf for export to tasks, do it now
+ updateGiraphConfForExport();
+ YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
+ appAttemptId.getApplicationId());
+ } catch (IOException ioe) {
+ // fail fast, this container will never launch.
+ throw new IllegalStateException("Could not configure the container" +
+ "launch context for GiraphYarnTasks.", ioe);
+ }
+ }
+ // else, return the prepopulated copy to reuse for each GiraphYarkTask
+ return LOCAL_RESOURCES;
+ }
+
+ /**
+ * If you're going to make ANY CHANGES to your local GiraphConfiguration
+ * while running the GiraphApplicationMaster, put them here.
+ * This method replaces the current XML file GiraphConfiguration
+ * stored in HDFS with the copy you have modified locally in-memory.
+ */
+ private void updateGiraphConfForExport()
+ throws IOException {
+ // Giraph expects this MapReduce stuff
+ giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ appAttemptId.getAttemptId());
+ // now republish the giraph-conf.xml in HDFS
+ YarnUtils.exportGiraphConfiguration(giraphConf,
+ appAttemptId.getApplicationId());
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManager} and launch the container
+ * that will house one of our Giraph worker (or master) tasks.
+ */
+ private class LaunchContainerRunnable implements Runnable {
+ /** Allocated container */
+ private Container container;
+ /** Handle to communicate with ContainerManager */
+ private ContainerManager containerManager;
+ /** Heap memory in MB to allocate for this JVM in the launched container */
+ private final int heapSize;
+
+ /**
+ * Constructor.
+ * @param newGiraphTaskContainer Allocated container
+ * @param heapMb the <code>-Xmx</code> setting for each launched task.
+ */
+ public LaunchContainerRunnable(final Container newGiraphTaskContainer,
+ final int heapMb) {
+ this.container = newGiraphTaskContainer;
+ this.heapSize = heapMb;
+ }
+
+ /**
+ * Helper function to connect to ContainerManager, which resides on the
+ * same compute node as this Giraph task's container. The CM starts tasks.
+ */
+ private void connectToCM() {
+ LOG.debug("Connecting to CM for containerid=" + container.getId());
+ String cmIpPortStr = container.getNodeId().getHost() + ":" +
+ container.getNodeId().getPort();
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ LOG.info("Connecting to CM at " + cmIpPortStr);
+ this.containerManager = (ContainerManager)
+ rpc.getProxy(ContainerManager.class, cmAddress, giraphConf);
+ }
+
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ public void run() {
+ // Connect to ContainerManager
+ connectToCM();
+ // configure the launcher for the Giraph task it will host
+ StartContainerRequest startReq =
+ Records.newRecord(StartContainerRequest.class);
+ startReq.setContainerLaunchContext(buildContainerLaunchContext());
+ // request CM to start this container as spec'd in ContainerLaunchContext
+ try {
+ containerManager.startContainer(startReq);
+ } catch (YarnRemoteException yre) {
+ LOG.error("StartContainerRequest failed for containerId=" +
+ container.getId(), yre);
+ }
+ }
+
+ /**
+ * Boilerplate to set up the ContainerLaunchContext to tell the Container
+ * Manager how to launch our Giraph task in the execution container we have
+ * already allocated.
+ * @return a populated ContainerLaunchContext object.
+ */
+ private ContainerLaunchContext buildContainerLaunchContext() {
+ LOG.info("Setting up container launch container for containerid=" +
+ container.getId());
+ ContainerLaunchContext launchContext = Records
+ .newRecord(ContainerLaunchContext.class);
+ launchContext.setContainerId(container.getId());
+ launchContext.setResource(container.getResource());
+ // args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
+ final List<String> commands = generateShellExecCommand();
+ launchContext.setCommands(commands);
+ // add user information to the job
+ String jobUserName = "ERROR_UNKNOWN_USER";
+ UserGroupInformation ugi = null;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ jobUserName = ugi.getUserName();
+ } catch (IOException ioe) {
+ jobUserName =
+ System.getenv(ApplicationConstants.Environment.USER.name());
+ }
+ launchContext.setUser(jobUserName);
+ LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
+ // Set the environment variables to inject into remote task's container
+ buildEnvironment(launchContext);
+ // Set the local resources: just send the copies already in HDFS
+ launchContext.setLocalResources(getTaskResourceMap());
+ return launchContext;
+ }
+
+ /**
+ * Generates our command line string used to launch our Giraph tasks.
+ * @return the BASH shell commands to launch the job.
+ */
+ private List<String> generateShellExecCommand() {
+ return ImmutableList.of("java " +
+ "-Xmx" + heapSize + "M " +
+ "-Xms" + heapSize + "M " +
+ "-cp .:${CLASSPATH} " +
+ "org.apache.giraph.yarn.GiraphYarnTask " +
+ appAttemptId.getApplicationId().getClusterTimestamp() + " " +
+ appAttemptId.getApplicationId().getId() + " " +
+ container.getId().getId() + " " +
+ appAttemptId.getAttemptId() + " " +
+ "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ "/task-" + container.getId().getId() + "-stdout.log " +
+ "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ "/task-" + container.getId().getId() + "-stderr.log "
+ );
+ }
+
+ /**
+ * Utility to populate the environment vars we wish to inject into the new
+ * containter's env when the Giraph BSP task is executed.
+ * @param launchContext the launch context which will set our environment
+ * vars in the app master's execution container.
+ */
+ private void buildEnvironment(final ContainerLaunchContext launchContext) {
+ Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
+ // pick up the local classpath so when we instantiate a Configuration
+ // remotely, we also get the "mapred-site.xml" and "yarn-site.xml"
+ YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
+ // set this map of env vars into the launch context.
+ launchContext.setEnvironment(classPathForEnv);
+ }
+ }
+
+ /**
+ * Application entry point
+ * @param args command-line args (set by GiraphYarnClient, if any)
+ */
+ public static void main(final String[] args) {
+ String containerIdString =
+ System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ if (containerIdString == null) {
+ // container id should always be set in the env by the framework
+ throw new IllegalArgumentException("ContainerId not found in env vars.");
+ }
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+ ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
+ try {
+ GiraphApplicationMaster giraphAppMaster =
+ new GiraphApplicationMaster(containerId, appAttemptId);
+ giraphAppMaster.run();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Throwable t) {
+ // CHECKSTYLE: resume IllegalCatch
+ LOG.error("GiraphApplicationMaster caught a " +
+ "top-level exception in main.", t);
+ System.exit(2);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
new file mode 100644
index 0000000..341db0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The initial launcher for a YARN-based Giraph job. This class attempts to
+ * configure and send a request to the ResourceManager for a single
+ * application container to host GiraphApplicationMaster. The RPC connection
+ * between the RM and GiraphYarnClient is the YARN ApplicationManager.
+ */
+public class GiraphYarnClient extends YarnClientImpl {
+ static {
+ Configuration.addDefaultResource("giraph-site.xml");
+ }
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(GiraphYarnClient.class);
+ /** Sleep time between silent progress checks */
+ private static final int JOB_STATUS_INTERVAL_MSECS = 800;
+ /** Memory (in MB) to allocate for our ApplicationMaster container */
+ private static final int YARN_APP_MASTER_MEMORY_MB = 1024;
+
+ /** human-readable job name */
+ private final String jobName;
+ /** Helper configuration from the job */
+ private final GiraphConfiguration giraphConf;
+ /** ApplicationId object (needed for RPC to ResourceManager) */
+ private ApplicationId appId;
+ /** # of sleeps between progress reports to client */
+ private int reportCounter;
+
+ /**
+ * Constructor. Requires caller to hand us a GiraphConfiguration.
+ *
+ * @param giraphConf User-defined configuration
+ * @param jobName User-defined job name
+ */
+ public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
+ throws IOException {
+ super();
+ this.reportCounter = 0;
+ this.jobName = jobName;
+ this.appId = null; // can't set this until after start()
+ this.giraphConf = giraphConf;
+ verifyOutputDirDoesNotExist();
+ super.init(this.giraphConf);
+ }
+
+ /**
+ * Submit a request to the Hadoop YARN cluster's ResourceManager
+ * to obtain an application container. This will run our ApplicationMaster,
+ * which will in turn request app containers for Giraphs' master and all
+ * worker tasks.
+ * @param verbose Not implemented yet, to provide compatibility w/GiraphJob
+ * @return true if job is successful
+ */
+ public boolean run(final boolean verbose) {
+ checkJobLocalZooKeeperSupported();
+ // init our connection to YARN ResourceManager RPC
+ start();
+ // request an application id from the RM
+ GetNewApplicationResponse getNewAppResponse;
+ try {
+ getNewAppResponse = super.getNewApplication();
+ // make sure we have the cluster resources to run the job.
+ checkPerNodeResourcesAvailable(getNewAppResponse);
+ } catch (YarnRemoteException yre) {
+ yre.printStackTrace();
+ return false;
+ }
+ appId = getNewAppResponse.getApplicationId();
+ LOG.info("Obtained new Application ID: " + appId);
+ // sanity check
+ applyConfigsForYarnGiraphJob();
+ // configure our request for an exec container for GiraphApplicationMaster
+ ApplicationSubmissionContext appContext = createAppSubmissionContext();
+ ContainerLaunchContext containerContext = buildContainerLaunchContext();
+ appContext.setAMContainerSpec(containerContext);
+ LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
+ "launch container is populated.");
+ // make the request, blow up if fail, loop and report job progress if not
+ try {
+ // obtain an "updated copy" of the appId for status checks/job kill later
+ appId = super.submitApplication(appContext);
+ } catch (YarnRemoteException yre) {
+ throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
+ }
+ LOG.info("GiraphApplicationMaster container request was submitted to " +
+ "ResourceManager for job: " + jobName);
+ return awaitGiraphJobCompletion();
+ }
+
+ /**
+ * Without Hadoop MR to check for us, make sure the output dir doesn't exist!
+ */
+ private void verifyOutputDirDoesNotExist() {
+ Path outDir = null;
+ try {
+ FileSystem fs = FileSystem.get(giraphConf);
+ String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
+ outDir =
+ new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
+ FileStatus outStatus = fs.getFileStatus(outDir);
+ if (outStatus.isDirectory() || outStatus.isFile() ||
+ outStatus.isSymlink()) {
+ throw new IllegalStateException("Path " + outDir + " already exists.");
+ }
+ } catch (IOException ioe) {
+ LOG.info("Final output path is: " + outDir);
+ }
+ }
+
+ /**
+ * Configuration settings we need to customize for a Giraph on YARN
+ * job. We need to call this EARLY in the job, before the GiraphConfiguration
+ * is exported to HDFS for localization in each task container.
+ */
+ private void applyConfigsForYarnGiraphJob() {
+ GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
+ giraphConf.set("mapred.job.id", "giraph_yarn_" + appId); // ZK app base path
+ }
+
+ /**
+ * Utility to make sure we have the cluster resources we need to run this
+ * job. If they are not available, we should die here before too much setup.
+ * @param cluster the GetNewApplicationResponse from the YARN RM.
+ */
+ private void checkPerNodeResourcesAvailable(
+ final GetNewApplicationResponse cluster) {
+ // are there enough containers to go around for our Giraph job?
+ List<NodeReport> nodes = null;
+ int numContainers = 0;
+ long totalAvailable = 0;
+ try {
+ nodes = super.getNodeReports();
+ } catch (YarnRemoteException yre) {
+ throw new RuntimeException("GiraphYarnClient could not connect with " +
+ "the YARN ResourceManager to determine the number of available " +
+ "application containers.", yre);
+ }
+ for (NodeReport node : nodes) {
+ numContainers += node.getNumContainers();
+ totalAvailable += node.getCapability().getMemory();
+ }
+ // 1 master + all workers in -w command line arg
+ final int workers = giraphConf.getMaxWorkers() + 1;
+ if (workers < numContainers) {
+ throw new RuntimeException("Giraph job requires " + workers +
+ " containers to run; cluster only hosts " + numContainers);
+ }
+ checkAndAdjustPerTaskHeapSize(cluster);
+ final long totalAsk =
+ giraphConf.getYarnTaskHeapMb() * workers;
+ if (totalAsk > totalAvailable) {
+ throw new IllegalStateException("Giraph's estimated cluster heap " +
+ totalAsk + "MB ask is greater than the current available cluster " +
+ "heap of " + totalAvailable + "MB. Aborting Job.");
+ }
+ }
+
+ /**
+ * Adjust the user-supplied <code>-yh</code> and <code>-w</code>
+ * settings if they are too small or large for the current cluster,
+ * and re-record the new settings in the GiraphConfiguration for export.
+ * @param gnar the GetNewAppResponse from the YARN ResourceManager.
+ */
+ private void checkAndAdjustPerTaskHeapSize(GetNewApplicationResponse gnar) {
+ // do we have the right heap size on these cluster nodes to run our job?
+ final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
+ final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
+ // make sure heap size is OK for this cluster's available containers
+ int giraphMem = giraphConf.getYarnTaskHeapMb();
+ if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
+ LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
+ }
+ if (giraphMem > maxCapacity) {
+ LOG.info("Giraph's request of heap MB per-task is more than the " +
+ "minimum; downgrading Giraph to" + maxCapacity + "MB.");
+ giraphMem = maxCapacity;
+ }
+ if (giraphMem < minCapacity) {
+ LOG.info("Giraph's request of heap MB per-task is less than the " +
+ "minimum; upgrading Giraph to " + minCapacity + "MB.");
+ giraphMem = minCapacity;
+ }
+ giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
+ }
+
+ /**
+ * Kill time for the client, report progress occasionally, and otherwise
+ * just sleep and wait for the job to finish. If no AM response, kill the app.
+ * @return true if job run is successful.
+ */
+ private boolean awaitGiraphJobCompletion() {
+ boolean done;
+ ApplicationReport report = null;
+ try {
+ do {
+ try {
+ Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
+ } catch (InterruptedException ir) {
+ LOG.info("Progress reporter's sleep was interrupted!", ir);
+ }
+ report = super.getApplicationReport(appId);
+ done = checkProgress(report);
+ } while (!done);
+ if (!giraphConf.metricsEnabled()) {
+ cleanupJarCache();
+ }
+ } catch (IOException ex) {
+ final String diagnostics = (null == report) ? "" :
+ "Diagnostics: " + report.getDiagnostics();
+ LOG.error("Fatal fault encountered, failing " + jobName + ". " +
+ diagnostics, ex);
+ try {
+ LOG.error("FORCIBLY KILLING Application from AppMaster.");
+ super.killApplication(appId);
+ } catch (YarnRemoteException yre) {
+ LOG.error("Exception raised in attempt to kill application.", yre);
+ }
+ return false;
+ }
+ return printFinalJobReport();
+ }
+
+ /**
+ * Deletes the HDFS cache in YARN, which replaces DistributedCache of Hadoop.
+ * If metrics are enabled this will not get called (so you can examine cache.)
+ * @throws IOException if bad things happen.
+ */
+ private void cleanupJarCache() throws IOException {
+ FileSystem fs = FileSystem.get(giraphConf);
+ Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
+ if (fs.exists(baseCacheDir)) {
+ LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
+ fs.delete(baseCacheDir, true); // stuff inside
+ fs.delete(baseCacheDir, false); // dir itself
+ }
+ }
+
+ /**
+ * Print final formatted job report for local client that initiated this run.
+ * @return true for app success, false for failure.
+ */
+ private boolean printFinalJobReport() {
+ ApplicationReport report;
+ try {
+ report = super.getApplicationReport(appId);
+ FinalApplicationStatus finalAppStatus =
+ report.getFinalApplicationStatus();
+ final long secs =
+ (report.getFinishTime() - report.getStartTime()) / 1000L;
+ final String time = String.format("%d minutes, %d seconds.",
+ secs / 60L, secs % 60L);
+ LOG.info("Completed " + jobName + ": " +
+ finalAppStatus.name() + ", total running time: " + time);
+ } catch (YarnRemoteException yre) {
+ LOG.error("Exception encountered while attempting to request " +
+ "a final job report for " + jobName , yre);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Compose the ContainerLaunchContext for the Application Master.
+ * @return the CLC object populated and configured.
+ */
+ private ContainerLaunchContext buildContainerLaunchContext() {
+ ContainerLaunchContext appMasterContainer =
+ Records.newRecord(ContainerLaunchContext.class);
+ appMasterContainer.setEnvironment(buildEnvironment());
+ appMasterContainer.setLocalResources(buildLocalResourceMap());
+ appMasterContainer.setCommands(buildAppMasterExecCommand());
+ appMasterContainer.setResource(buildContainerMemory());
+ appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
+ return appMasterContainer;
+ }
+
+ /**
+ * Assess whether job is already finished/failed and 'done' flag needs to be
+ * set, prints progress display for client if all is going well.
+ * @param report the application report to assess.
+ * @return true if job report indicates the job run is over.
+ */
+ private boolean checkProgress(final ApplicationReport report) {
+ YarnApplicationState jobState = report.getYarnApplicationState();
+ if (jobState == YarnApplicationState.FINISHED ||
+ jobState == YarnApplicationState.KILLED) {
+ return true;
+ } else if (jobState == YarnApplicationState.FAILED) {
+ LOG.error(jobName + " reports FAILED state, diagnostics show: " +
+ report.getDiagnostics());
+ return true;
+ } else {
+ if (reportCounter++ % 5 == 0) {
+ displayJobReport(report);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Display a formatted summary of the job progress report from the AM.
+ * @param report the report to display.
+ */
+ private void displayJobReport(final ApplicationReport report) {
+ if (null == report) {
+ throw new IllegalStateException("[*] Latest ApplicationReport for job " +
+ jobName + " was not received by the local client.");
+ }
+ final float elapsed =
+ (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
+ LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
+ LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
+ report.getYarnApplicationState().name() + ", Containers used: " +
+ report.getApplicationResourceUsageReport().getNumUsedContainers());
+ }
+
+ /**
+ * Utility to produce the command line to activate the AM from the shell.
+ * @return A <code>List<String></code> of shell commands to execute in
+ * the container allocated to us by the RM to host our App Master.
+ */
+ private List<String> buildAppMasterExecCommand() {
+ // 'gam-' prefix is for GiraphApplicationMaster in log file names
+ return ImmutableList.of("${JAVA_HOME}/bin/java " +
+ "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
+ "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!
+ "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
+ "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
+ "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
+ );
+ }
+
+ /**
+ * Check if the job's configuration is for a local run. These can all be
+ * removed as we expand the functionality of the "pure YARN" Giraph profile.
+ */
+ private void checkJobLocalZooKeeperSupported() {
+ final String checkZkList = giraphConf.getZookeeperList();
+ if (checkZkList == null || checkZkList.isEmpty()) {
+ throw new IllegalArgumentException("Giraph on YARN does not currently" +
+ "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" +
+ checkZkList + "'");
+ }
+ }
+
+ /**
+ * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS
+ * in the LocalResources map, copy to HDFS on that same registered path.
+ * @param map the LocalResources list to populate.
+ */
+ private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
+ throws IOException {
+ Set<String> jars = Sets.newHashSet();
+ String[] libJars = giraphConf.getYarnLibJars().split(",");
+ for (String libJar : libJars) {
+ jars.add(libJar);
+ }
+ FileSystem fs = FileSystem.get(giraphConf);
+ Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+ for (Path jar : YarnUtils.getLocalFiles(jars)) {
+ LOG.info("Located local resource for export at: " + jar);
+ Path dest = new Path(baseDir, jar.getName());
+ fs.copyFromLocalFile(false, true, jar, dest);
+ YarnUtils.addFileToResourceMap(map, fs, dest);
+ }
+ }
+
+ /**
+ * Construct the memory requirements for the AppMaster's container request.
+ * @return A Resource that wraps the memory request.
+ */
+ private Resource buildContainerMemory() {
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(YARN_APP_MASTER_MEMORY_MB);
+ return capability;
+ }
+
+ /**
+ * Create the mapping of environment vars that will be visible to the
+ * ApplicationMaster in its remote app container.
+ * @return a map of environment vars to set up for the AppMaster.
+ */
+ private Map<String, String> buildEnvironment() {
+ Map<String, String> environment =
+ Maps.<String, String>newHashMap();
+ YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
+ // TODO: add java.class.path to env map if running a local YARN minicluster.
+ return environment;
+ }
+
+ /**
+ * Create the mapping of files and JARs to send to the GiraphApplicationMaster
+ * and from there on to the Giraph tasks.
+ * @return the map of jars to local resource paths for transport
+ * to the host container that will run our AppMaster.
+ */
+ private Map<String, LocalResource> buildLocalResourceMap() {
+ Map<String, LocalResource> localResources =
+ Maps.<String, LocalResource>newHashMap();
+ try {
+ // export the GiraphConfiguration to HDFS for localization to remote tasks
+ YarnUtils.exportGiraphConfiguration(giraphConf, appId);
+ YarnUtils.addGiraphConfToLocalResourceMap(
+ giraphConf, appId, localResources);
+ // add jars from '-yj' cmd-line arg to resource map for localization
+ addLocalJarsToResourceMap(localResources);
+ return localResources;
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
+ }
+ }
+
+ /**
+ * Create the app submission context, and populate it.
+ * @return the populated ApplicationSubmissionContext for the AppMaster.
+ */
+ private ApplicationSubmissionContext createAppSubmissionContext() {
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(appId);
+ appContext.setApplicationName(jobName);
+ return appContext;
+ }
+}