You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/02/05 23:28:59 UTC
git commit: GIRAPH-500: Refactor job launch code out of graph package
and into job package (ereisman)
Updated Branches:
refs/heads/trunk 01b353334 -> 0c77d52f3
GIRAPH-500: Refactor job launch code out of graph package and into job package (ereisman)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0c77d52f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0c77d52f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0c77d52f
Branch: refs/heads/trunk
Commit: 0c77d52f3c7eee138f36eeb831b4afab45216b2f
Parents: 01b3533
Author: Eli Reisman <er...@apache.org>
Authored: Tue Feb 5 14:28:32 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Tue Feb 5 14:28:32 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../io/accumulo/TestAccumuloVertexFormat.java | 2 +-
.../main/java/org/apache/giraph/GiraphRunner.java | 4 +-
.../giraph/benchmark/AggregatorsBenchmark.java | 2 +-
.../apache/giraph/benchmark/PageRankBenchmark.java | 2 +-
.../giraph/benchmark/RandomMessageBenchmark.java | 2 +-
.../giraph/benchmark/ShortestPathsBenchmark.java | 2 +-
.../apache/giraph/conf/GiraphConfiguration.java | 4 +-
.../conf/ImmutableClassesGiraphConfiguration.java | 2 +-
.../giraph/examples/SimpleCheckpointVertex.java | 2 +-
.../examples/SimpleVertexWithWorkerContext.java | 2 +-
.../apache/giraph/graph/DefaultJobObserver.java | 52 ---
.../java/org/apache/giraph/graph/GiraphJob.java | 287 --------------
.../org/apache/giraph/graph/GiraphJobObserver.java | 39 --
.../apache/giraph/graph/GiraphTypeValidator.java | 280 --------------
.../org/apache/giraph/job/DefaultJobObserver.java | 52 +++
.../main/java/org/apache/giraph/job/GiraphJob.java | 289 +++++++++++++++
.../org/apache/giraph/job/GiraphJobObserver.java | 39 ++
.../org/apache/giraph/job/GiraphTypeValidator.java | 282 ++++++++++++++
.../java/org/apache/giraph/job/package-info.java | 21 +
.../apache/giraph/utils/InternalVertexRunner.java | 2 +-
.../src/test/java/org/apache/giraph/BspCase.java | 2 +-
.../java/org/apache/giraph/TestAutoCheckpoint.java | 2 +-
.../test/java/org/apache/giraph/TestBspBasic.java | 2 +-
.../org/apache/giraph/TestGraphPartitioner.java | 2 +-
.../org/apache/giraph/TestManualCheckpoint.java | 2 +-
.../java/org/apache/giraph/TestMutateGraph.java | 2 +-
.../org/apache/giraph/TestNotEnoughMapTasks.java | 2 +-
.../aggregators/TestAggregatorsHandling.java | 2 +-
.../org/apache/giraph/examples/TestPageRank.java | 2 +-
.../org/apache/giraph/io/TestJsonBase64Format.java | 2 +-
.../partition/TestGiraphTransferRegulator.java | 2 +-
.../org/apache/giraph/vertex/TestVertexTypes.java | 2 +-
.../io/hbase/TestHBaseRootMarkerVertextFormat.java | 2 +-
.../giraph/io/hcatalog/HiveGiraphRunner.java | 2 +-
35 files changed, 712 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cc0ca34..2e25026 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-500: Refactor job launch code out of graph package and into job package (ereisman)
+
GIRAPH-493: Remove EdgeWithSource (nitay)
GIRAPH-429: Number of input split threads set to 1 less than necessary (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
index e68e18d..8894199 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
@@ -36,7 +36,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 2e88a83..b6a6113 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -28,8 +28,8 @@ import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.examples.Algorithm;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.job.GiraphTypeValidator;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index f9a0730..a82a9f8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 6e49812..7ebcb93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -25,7 +25,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.combiner.DoubleSumCombiner;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index d48aa6d..d0d80af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 888532d..52bbac4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -26,7 +26,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.combiner.MinimumDoubleCombiner;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 79b12d3..b3b9c4b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,8 +20,8 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.DefaultJobObserver;
-import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2513d8f..d75d624 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.master.MasterCompute;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
index ee62c99..337f30e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
@@ -26,7 +26,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.graph.DefaultEdge;
import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
index b7605bb..f6488d5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
@@ -21,7 +21,7 @@ package org.apache.giraph.examples;
import org.apache.giraph.examples.SimpleSuperstepVertex.
SimpleSuperstepVertexInputFormat;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
deleted file mode 100644
index 9f0418a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Default implementation of job observer that does nothing.
- */
-public class DefaultJobObserver implements GiraphJobObserver,
- ImmutableClassesGiraphConfigurable {
- /** configuration object stored here */
- private ImmutableClassesGiraphConfiguration conf;
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- this.conf = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return this.conf;
- }
-
- @Override
- public void launchingJob(Job jobToSubmit) {
- // do nothing
- }
-
- @Override
- public void jobFinished(Job jobToSubmit, boolean passed) {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
deleted file mode 100644
index 18c0db2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.BspOutputFormat;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-
-/**
- * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
- * Uses composition to avoid unwanted {@link Job} methods from exposure
- * to the user.
- */
-public class GiraphJob {
- static {
- Configuration.addDefaultResource("giraph-site.xml");
- }
-
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(GiraphJob.class);
- /** Internal delegated job to proxy interface requests for Job */
- private final DelegatedJob delegatedJob;
- /** Name of the job */
- private final String jobName;
- /** Helper configuration from the job */
- private final GiraphConfiguration giraphConfiguration;
-
- /**
- * Delegated job that simply passes along the class GiraphConfiguration.
- */
- private class DelegatedJob extends Job {
- /** Ensure that for job initiation the super.getConfiguration() is used */
- private boolean jobInited = false;
-
- /**
- * Constructor
- *
- * @throws IOException
- */
- DelegatedJob() throws IOException { }
-
- @Override
- public Configuration getConfiguration() {
- if (jobInited) {
- return giraphConfiguration;
- } else {
- return super.getConfiguration();
- }
- }
- }
-
- /**
- * Constructor that will instantiate the configuration
- *
- * @param jobName User-defined job name
- * @throws IOException
- */
- public GiraphJob(String jobName) throws IOException {
- this(new GiraphConfiguration(), jobName);
- }
-
- /**
- * Constructor.
- *
- * @param configuration User-defined configuration
- * @param jobName User-defined job name
- * @throws IOException
- */
- public GiraphJob(Configuration configuration,
- String jobName) throws IOException {
- this(new GiraphConfiguration(configuration), jobName);
- }
-
- /**
- * Constructor.
- *
- * @param giraphConfiguration User-defined configuration
- * @param jobName User-defined job name
- * @throws IOException
- */
- public GiraphJob(GiraphConfiguration giraphConfiguration,
- String jobName) throws IOException {
- this.jobName = jobName;
- this.giraphConfiguration = giraphConfiguration;
- this.delegatedJob = new DelegatedJob();
- }
-
- /**
- * Get the configuration from the internal job.
- *
- * @return Configuration used by the job.
- */
- public GiraphConfiguration getConfiguration() {
- return giraphConfiguration;
- }
-
- /**
- * Be very cautious when using this method as it returns the internal job
- * of {@link GiraphJob}. This should only be used for methods that require
- * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
- *
- * @return Internal job that will actually be submitted to Hadoop.
- */
- public Job getInternalJob() {
- delegatedJob.jobInited = true;
- return delegatedJob;
- }
- /**
- * Make sure the configuration is set properly by the user prior to
- * submitting the job.
- *
- * @param conf Configuration to check
- */
- private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
- if (conf.getMaxWorkers() < 0) {
- throw new RuntimeException("checkConfiguration: No valid " +
- GiraphConstants.MAX_WORKERS);
- }
- if (conf.getMinPercentResponded() <= 0.0f ||
- conf.getMinPercentResponded() > 100.0f) {
- throw new IllegalArgumentException(
- "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
- " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
- }
- if (conf.getMinWorkers() < 0) {
- throw new IllegalArgumentException("checkConfiguration: No valid " +
- GiraphConstants.MIN_WORKERS);
- }
- if (conf.getVertexClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: Null" +
- GiraphConstants.VERTEX_CLASS);
- }
- if (conf.getVertexInputFormatClass() == null &&
- conf.getEdgeInputFormatClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: One of " +
- GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
- GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
- }
- if (conf.getEdgeInputFormatClass() != null &&
- !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
- throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
- " only works with mutable vertices");
- }
- if (conf.getVertexResolverClass() == null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("checkConfiguration: No class found for " +
- GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
- DefaultVertexResolver.class.getCanonicalName());
- }
- }
- }
-
-
- /**
- * Check if the configuration is local. If it is local, do additional
- * checks due to the restrictions of LocalJobRunner.
- *
- * @param conf Configuration
- */
- private static void checkLocalJobRunnerConfiguration(
- GiraphConfiguration conf) {
- String jobTracker = conf.get("mapred.job.tracker", null);
- if (!jobTracker.equals("local")) {
- // Nothing to check
- return;
- }
-
- int maxWorkers = conf.getMaxWorkers();
- if (maxWorkers != 1) {
- throw new IllegalArgumentException(
- "checkLocalJobRunnerConfiguration: When using " +
- "LocalJobRunner, must have only one worker since " +
- "only 1 task at a time!");
- }
- if (conf.getSplitMasterWorker()) {
- throw new IllegalArgumentException(
- "checkLocalJobRunnerConfiguration: When using " +
- "LocalJobRunner, you cannot run in split master / worker " +
- "mode since there is only 1 task at a time!");
- }
- }
-
- /**
- * Check whether a specified int conf value is set and if not, set it.
- *
- * @param param Conf value to check
- * @param defaultValue Assign to value if not set
- */
- private void setIntConfIfDefault(String param, int defaultValue) {
- if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
- Integer.MIN_VALUE) {
- giraphConfiguration.setInt(param, defaultValue);
- }
- }
-
- /**
- * Runs the actual graph application through Hadoop Map-Reduce.
- *
- * @param verbose If true, provide verbose output, false otherwise
- * @return True if success, false otherwise
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws IOException
- */
- public final boolean run(boolean verbose)
- throws IOException, InterruptedException, ClassNotFoundException {
- // Most users won't hit this hopefully and can set it higher if desired
- setIntConfIfDefault("mapreduce.job.counters.limit", 512);
-
- // Capacity scheduler-specific settings. These should be enough for
- // a reasonable Giraph job
- setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
- setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
-
- // Speculative execution doesn't make sense for Giraph
- giraphConfiguration.setBoolean(
- "mapred.map.tasks.speculative.execution", false);
-
- // Set the ping interval to 5 minutes instead of one minute
- // (DEFAULT_PING_INTERVAL)
- Client.setPingInterval(giraphConfiguration, 60000 * 5);
-
- // Should work in MAPREDUCE-1938 to let the user jars/classes
- // get loaded first
- giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
- giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
-
- // If the checkpoint frequency is 0 (no failure handling), set the max
- // tasks attempts to be 0 to encourage faster failure of unrecoverable jobs
- if (giraphConfiguration.getCheckpointFrequency() == 0) {
- int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
- giraphConfiguration.setMaxTaskAttempts(0);
- if (LOG.isInfoEnabled()) {
- LOG.info("run: Since checkpointing is disabled (default), " +
- "do not allow any task retries (setting " +
- GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
- "old value = " + oldMaxTaskAttempts + ")");
- }
- }
-
- // Set the job properties, check them, and submit the job
- ImmutableClassesGiraphConfiguration conf =
- new ImmutableClassesGiraphConfiguration(giraphConfiguration);
- checkConfiguration(conf);
- checkLocalJobRunnerConfiguration(conf);
- Job submittedJob = new Job(conf, jobName);
- if (submittedJob.getJar() == null) {
- submittedJob.setJarByClass(getClass());
- }
- submittedJob.setNumReduceTasks(0);
- submittedJob.setMapperClass(GraphMapper.class);
- submittedJob.setInputFormatClass(BspInputFormat.class);
- submittedJob.setOutputFormatClass(BspOutputFormat.class);
-
- GiraphJobObserver jobObserver = conf.getJobObserver();
- jobObserver.launchingJob(submittedJob);
- boolean passed = submittedJob.waitForCompletion(verbose);
- jobObserver.jobFinished(submittedJob, passed);
-
- return passed;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
deleted file mode 100644
index 7aec8c2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * An observer over the job launch lifecycle.
- */
-public interface GiraphJobObserver {
- /**
- * Callback for job about to start.
- * @param jobToSubmit Job we're going to submit to hadoop.
- */
- void launchingJob(Job jobToSubmit);
-
- /**
- * Callback when job finishes.
- * @param submittedJob Job that ran in hadoop.
- * @param passed true if job succeeded.
- */
- void jobFinished(Job submittedJob, boolean passed);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
deleted file mode 100644
index 213a69a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.BspUtils;
-import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * GiraphTypeValidator attempts to verify the consistency of
- * user-chosen InputFormat, OutputFormat, and Vertex type
- * parameters before the job run actually begins.
- *
- * @param <I> the Vertex ID type
- * @param <V> the Vertex Value type
- * @param <E> the Edge Value type
- * @param <M> the Message type
- */
-public class GiraphTypeValidator<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Class logger object.
- */
- private static Logger LOG =
- Logger.getLogger(GiraphTypeValidator.class);
-
- /** I param vertex index in classList */
- private static final int ID_PARAM_INDEX = 0;
- /** V param vertex index in classList */
- private static final int VALUE_PARAM_INDEX = 1;
- /** E param vertex index in classList */
- private static final int EDGE_PARAM_INDEX = 2;
- /** M param vertex index in classList */
- private static final int MSG_PARAM_INDEX = 3;
- /** M param vertex combiner index in classList */
- private static final int MSG_COMBINER_PARAM_INDEX = 1;
- /** E param edge input format index in classList */
- private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
-
- /** Vertex Index Type */
- private Type vertexIndexType;
- /** Vertex Index Type */
- private Type vertexValueType;
- /** Vertex Index Type */
- private Type edgeValueType;
- /** Vertex Index Type */
- private Type messageValueType;
-
- /**
- * The Configuration object for use in the validation test.
- */
- private Configuration conf;
-
- /**
- * Constructor to execute the validation test, throws
- * unchecked exception to end job run on failure.
- *
- * @param conf the Configuration for this run.
- */
- public GiraphTypeValidator(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Make sure that all registered classes have matching types. This
- * is a little tricky due to type erasure, cannot simply get them from
- * the class type arguments. Also, set the vertex index, vertex value,
- * edge value and message value classes.
- */
- public void validateClassTypes() {
- Class<? extends Vertex<I, V, E, M>> vertexClass =
- BspUtils.<I, V, E, M>getVertexClass(conf);
- List<Class<?>> classList = ReflectionUtils.getTypeArguments(
- Vertex.class, vertexClass);
- vertexIndexType = classList.get(ID_PARAM_INDEX);
- vertexValueType = classList.get(VALUE_PARAM_INDEX);
- edgeValueType = classList.get(EDGE_PARAM_INDEX);
- messageValueType = classList.get(MSG_PARAM_INDEX);
- verifyVertexInputFormatGenericTypes();
- verifyEdgeInputFormatGenericTypes();
- verifyVertexOutputFormatGenericTypes();
- verifyVertexResolverGenericTypes();
- verifyVertexCombinerGenericTypes();
- }
-
- /** Verify matching generic types in VertexInputFormat. */
- private void verifyVertexInputFormatGenericTypes() {
- Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
- BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
- if (vertexInputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- VertexInputFormat.class, vertexInputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex input format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_INDEX) == null) {
- LOG.warn("Input format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
- }
- }
- }
-
- /** Verify matching generic types in EdgeInputFormat. */
- private void verifyEdgeInputFormatGenericTypes() {
- Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
- BspUtils.<I, E>getEdgeInputFormatClass(conf);
- if (edgeInputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- EdgeInputFormat.class, edgeInputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", edge input format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
- LOG.warn("Input format edge value type is not known");
- } else if (!edgeValueType.equals(
- classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", edge input format - " +
- classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
- }
- }
- }
-
- /** If there is a combiner type, verify its generic params match the job. */
- private void verifyVertexCombinerGenericTypes() {
- Class<? extends Combiner<I, M>> vertexCombinerClass =
- BspUtils.<I, M>getCombinerClass(conf);
- if (vertexCombinerClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- Combiner.class, vertexCombinerClass);
- if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
- }
- if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + messageValueType +
- ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
- }
- }
- }
-
- /** Verify that the output format's generic params match the job. */
- private void verifyVertexOutputFormatGenericTypes() {
- Class<? extends VertexOutputFormat<I, V, E>>
- vertexOutputFormatClass =
- BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
- if (vertexOutputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- VertexOutputFormat.class, vertexOutputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Output format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex output format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) == null) {
- LOG.warn("Output format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_INDEX) == null) {
- LOG.warn("Output format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
- }
- }
- }
-
- /** If there is a vertex resolver,
- * validate the generic parameter types. */
- private void verifyVertexResolverGenericTypes() {
- Class<? extends VertexResolver<I, V, E, M>>
- vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
- if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
- return;
- }
- Class<? extends DefaultVertexResolver<I, V, E, M>>
- dvrClass =
- (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- DefaultVertexResolver.class, dvrClass);
- if (classList.get(ID_PARAM_INDEX) != null &&
- !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) != null &&
- !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_INDEX) != null &&
- !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
- }
- if (classList.get(MSG_PARAM_INDEX) != null &&
- !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + messageValueType +
- ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
new file mode 100644
index 0000000..2e703ca
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Default implementation of job observer that does nothing.
+ */
+public class DefaultJobObserver implements GiraphJobObserver,
+ ImmutableClassesGiraphConfigurable {
+ /** configuration object stored here */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void launchingJob(Job jobToSubmit) {
+ // do nothing
+ }
+
+ @Override
+ public void jobFinished(Job jobToSubmit, boolean passed) {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
new file mode 100644
index 0000000..62498c6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.vertex.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
+ * Uses composition to avoid unwanted {@link Job} methods from exposure
+ * to the user.
+ */
+public class GiraphJob {
+ static {
+ Configuration.addDefaultResource("giraph-site.xml");
+ }
+
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(GiraphJob.class);
+ /** Internal delegated job to proxy interface requests for Job */
+ private final DelegatedJob delegatedJob;
+ /** Name of the job */
+ private final String jobName;
+ /** Helper configuration from the job */
+ private final GiraphConfiguration giraphConfiguration;
+
+ /**
+ * Delegated job that simply passes along the class GiraphConfiguration.
+ */
+ private class DelegatedJob extends Job {
+ /** Ensure that for job initiation the super.getConfiguration() is used */
+ private boolean jobInited = false;
+
+ /**
+ * Constructor
+ *
+ * @throws IOException
+ */
+ DelegatedJob() throws IOException { }
+
+ @Override
+ public Configuration getConfiguration() {
+ if (jobInited) {
+ return giraphConfiguration;
+ } else {
+ return super.getConfiguration();
+ }
+ }
+ }
+
+ /**
+ * Constructor that will instantiate the configuration
+ *
+ * @param jobName User-defined job name
+ * @throws IOException
+ */
+ public GiraphJob(String jobName) throws IOException {
+ this(new GiraphConfiguration(), jobName);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param configuration User-defined configuration
+ * @param jobName User-defined job name
+ * @throws IOException
+ */
+ public GiraphJob(Configuration configuration,
+ String jobName) throws IOException {
+ this(new GiraphConfiguration(configuration), jobName);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param giraphConfiguration User-defined configuration
+ * @param jobName User-defined job name
+ * @throws IOException
+ */
+ public GiraphJob(GiraphConfiguration giraphConfiguration,
+ String jobName) throws IOException {
+ this.jobName = jobName;
+ this.giraphConfiguration = giraphConfiguration;
+ this.delegatedJob = new DelegatedJob();
+ }
+
+ /**
+ * Get the configuration from the internal job.
+ *
+ * @return Configuration used by the job.
+ */
+ public GiraphConfiguration getConfiguration() {
+ return giraphConfiguration;
+ }
+
+ /**
+ * Be very cautious when using this method as it returns the internal job
+ * of {@link GiraphJob}. This should only be used for methods that require
+ * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
+ *
+ * @return Internal job that will actually be submitted to Hadoop.
+ */
+ public Job getInternalJob() {
+ delegatedJob.jobInited = true;
+ return delegatedJob;
+ }
+ /**
+ * Make sure the configuration is set properly by the user prior to
+ * submitting the job.
+ *
+ * @param conf Configuration to check
+ */
+ private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+ if (conf.getMaxWorkers() < 0) {
+ throw new RuntimeException("checkConfiguration: No valid " +
+ GiraphConstants.MAX_WORKERS);
+ }
+ if (conf.getMinPercentResponded() <= 0.0f ||
+ conf.getMinPercentResponded() > 100.0f) {
+ throw new IllegalArgumentException(
+ "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
+ " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
+ }
+ if (conf.getMinWorkers() < 0) {
+ throw new IllegalArgumentException("checkConfiguration: No valid " +
+ GiraphConstants.MIN_WORKERS);
+ }
+ if (conf.getVertexClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: Null" +
+ GiraphConstants.VERTEX_CLASS);
+ }
+ if (conf.getVertexInputFormatClass() == null &&
+ conf.getEdgeInputFormatClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: One of " +
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
+ GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+ }
+ if (conf.getEdgeInputFormatClass() != null &&
+ !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
+ throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
+ " only works with mutable vertices");
+ }
+ if (conf.getVertexResolverClass() == null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkConfiguration: No class found for " +
+ GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
+ DefaultVertexResolver.class.getCanonicalName());
+ }
+ }
+ }
+
+
+ /**
+ * Check if the configuration is local. If it is local, do additional
+ * checks due to the restrictions of LocalJobRunner.
+ *
+ * @param conf Configuration
+ */
+ private static void checkLocalJobRunnerConfiguration(
+ GiraphConfiguration conf) {
+ String jobTracker = conf.get("mapred.job.tracker", null);
+ if (!jobTracker.equals("local")) {
+ // Nothing to check
+ return;
+ }
+
+ int maxWorkers = conf.getMaxWorkers();
+ if (maxWorkers != 1) {
+ throw new IllegalArgumentException(
+ "checkLocalJobRunnerConfiguration: When using " +
+ "LocalJobRunner, must have only one worker since " +
+ "only 1 task at a time!");
+ }
+ if (conf.getSplitMasterWorker()) {
+ throw new IllegalArgumentException(
+ "checkLocalJobRunnerConfiguration: When using " +
+ "LocalJobRunner, you cannot run in split master / worker " +
+ "mode since there is only 1 task at a time!");
+ }
+ }
+
+ /**
+ * Check whether a specified int conf value is set and if not, set it.
+ *
+ * @param param Conf value to check
+ * @param defaultValue Assign to value if not set
+ */
+ private void setIntConfIfDefault(String param, int defaultValue) {
+ if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
+ Integer.MIN_VALUE) {
+ giraphConfiguration.setInt(param, defaultValue);
+ }
+ }
+
+ /**
+ * Runs the actual graph application through Hadoop Map-Reduce.
+ *
+ * @param verbose If true, provide verbose output, false otherwise
+ * @return True if success, false otherwise
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public final boolean run(boolean verbose)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ // Most users won't hit this hopefully and can set it higher if desired
+ setIntConfIfDefault("mapreduce.job.counters.limit", 512);
+
+ // Capacity scheduler-specific settings. These should be enough for
+ // a reasonable Giraph job
+ setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
+ setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
+
+ // Speculative execution doesn't make sense for Giraph
+ giraphConfiguration.setBoolean(
+ "mapred.map.tasks.speculative.execution", false);
+
+ // Set the ping interval to 5 minutes instead of one minute
+ // (DEFAULT_PING_INTERVAL)
+ Client.setPingInterval(giraphConfiguration, 60000 * 5);
+
+ // Should work in MAPREDUCE-1938 to let the user jars/classes
+ // get loaded first
+ giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
+ giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
+
+ // If the checkpoint frequency is 0 (no failure handling), set the max
+ // tasks attempts to be 0 to encourage faster failure of unrecoverable jobs
+ if (giraphConfiguration.getCheckpointFrequency() == 0) {
+ int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
+ giraphConfiguration.setMaxTaskAttempts(0);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Since checkpointing is disabled (default), " +
+ "do not allow any task retries (setting " +
+ GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
+ "old value = " + oldMaxTaskAttempts + ")");
+ }
+ }
+
+ // Set the job properties, check them, and submit the job
+ ImmutableClassesGiraphConfiguration conf =
+ new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+ checkConfiguration(conf);
+ checkLocalJobRunnerConfiguration(conf);
+ Job submittedJob = new Job(conf, jobName);
+ if (submittedJob.getJar() == null) {
+ submittedJob.setJarByClass(getClass());
+ }
+ submittedJob.setNumReduceTasks(0);
+ submittedJob.setMapperClass(GraphMapper.class);
+ submittedJob.setInputFormatClass(BspInputFormat.class);
+ submittedJob.setOutputFormatClass(BspOutputFormat.class);
+
+ GiraphJobObserver jobObserver = conf.getJobObserver();
+ jobObserver.launchingJob(submittedJob);
+ boolean passed = submittedJob.waitForCompletion(verbose);
+ jobObserver.jobFinished(submittedJob, passed);
+
+ return passed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
new file mode 100644
index 0000000..fbcc4f1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * An observer over the job launch lifecycle.
+ */
+public interface GiraphJobObserver {
+ /**
+ * Callback for job about to start.
+ * @param jobToSubmit Job we're going to submit to hadoop.
+ */
+ void launchingJob(Job jobToSubmit);
+
+ /**
+ * Callback when job finishes.
+ * @param submittedJob Job that ran in hadoop.
+ * @param passed true if job succeeded.
+ */
+ void jobFinished(Job submittedJob, boolean passed);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
new file mode 100644
index 0000000..de2a66d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspUtils;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * GiraphTypeValidator attempts to verify the consistency of
+ * user-chosen InputFormat, OutputFormat, and Vertex type
+ * parameters before the job run actually begins.
+ *
+ * @param <I> the Vertex ID type
+ * @param <V> the Vertex Value type
+ * @param <E> the Edge Value type
+ * @param <M> the Message type
+ */
+public class GiraphTypeValidator<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Class logger object.
+ */
+ private static Logger LOG =
+ Logger.getLogger(GiraphTypeValidator.class);
+
+ /** I param vertex index in classList */
+ private static final int ID_PARAM_INDEX = 0;
+ /** V param vertex index in classList */
+ private static final int VALUE_PARAM_INDEX = 1;
+ /** E param vertex index in classList */
+ private static final int EDGE_PARAM_INDEX = 2;
+ /** M param vertex index in classList */
+ private static final int MSG_PARAM_INDEX = 3;
+ /** M param vertex combiner index in classList */
+ private static final int MSG_COMBINER_PARAM_INDEX = 1;
+ /** E param edge input format index in classList */
+ private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
+
+ /** Vertex Index Type */
+ private Type vertexIndexType;
+ /** Vertex Index Type */
+ private Type vertexValueType;
+ /** Vertex Index Type */
+ private Type edgeValueType;
+ /** Vertex Index Type */
+ private Type messageValueType;
+
+ /**
+ * The Configuration object for use in the validation test.
+ */
+ private Configuration conf;
+
+ /**
+ * Constructor to execute the validation test, throws
+ * unchecked exception to end job run on failure.
+ *
+ * @param conf the Configuration for this run.
+ */
+ public GiraphTypeValidator(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Make sure that all registered classes have matching types. This
+ * is a little tricky due to type erasure, cannot simply get them from
+ * the class type arguments. Also, set the vertex index, vertex value,
+ * edge value and message value classes.
+ */
+ public void validateClassTypes() {
+ Class<? extends Vertex<I, V, E, M>> vertexClass =
+ BspUtils.<I, V, E, M>getVertexClass(conf);
+ List<Class<?>> classList = ReflectionUtils.getTypeArguments(
+ Vertex.class, vertexClass);
+ vertexIndexType = classList.get(ID_PARAM_INDEX);
+ vertexValueType = classList.get(VALUE_PARAM_INDEX);
+ edgeValueType = classList.get(EDGE_PARAM_INDEX);
+ messageValueType = classList.get(MSG_PARAM_INDEX);
+ verifyVertexInputFormatGenericTypes();
+ verifyEdgeInputFormatGenericTypes();
+ verifyVertexOutputFormatGenericTypes();
+ verifyVertexResolverGenericTypes();
+ verifyVertexCombinerGenericTypes();
+ }
+
+ /** Verify matching generic types in VertexInputFormat. */
+ private void verifyVertexInputFormatGenericTypes() {
+ Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+ BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+ if (vertexInputFormatClass != null) {
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(
+ VertexInputFormat.class, vertexInputFormatClass);
+ if (classList.get(ID_PARAM_INDEX) == null) {
+ LOG.warn("Input format vertex index type is not known");
+ } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex input format - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (classList.get(VALUE_PARAM_INDEX) == null) {
+ LOG.warn("Input format vertex value type is not known");
+ } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
+ "vertex - " + vertexValueType +
+ ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
+ }
+ if (classList.get(EDGE_PARAM_INDEX) == null) {
+ LOG.warn("Input format edge value type is not known");
+ } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
+ }
+ }
+ }
+
+ /** Verify matching generic types in EdgeInputFormat. */
+ private void verifyEdgeInputFormatGenericTypes() {
+ Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+ BspUtils.<I, E>getEdgeInputFormatClass(conf);
+ if (edgeInputFormatClass != null) {
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(
+ EdgeInputFormat.class, edgeInputFormatClass);
+ if (classList.get(ID_PARAM_INDEX) == null) {
+ LOG.warn("Input format vertex index type is not known");
+ } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", edge input format - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
+ LOG.warn("Input format edge value type is not known");
+ } else if (!edgeValueType.equals(
+ classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", edge input format - " +
+ classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
+ }
+ }
+ }
+
+ /** If there is a combiner type, verify its generic params match the job. */
+ private void verifyVertexCombinerGenericTypes() {
+ Class<? extends Combiner<I, M>> vertexCombinerClass =
+ BspUtils.<I, M>getCombinerClass(conf);
+ if (vertexCombinerClass != null) {
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(
+ Combiner.class, vertexCombinerClass);
+ if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Message value types don't match, " +
+ "vertex - " + messageValueType +
+ ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
+ }
+ }
+ }
+
+ /** Verify that the output format's generic params match the job. */
+ private void verifyVertexOutputFormatGenericTypes() {
+ Class<? extends VertexOutputFormat<I, V, E>>
+ vertexOutputFormatClass =
+ BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+ if (vertexOutputFormatClass != null) {
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(
+ VertexOutputFormat.class, vertexOutputFormatClass);
+ if (classList.get(ID_PARAM_INDEX) == null) {
+ LOG.warn("Output format vertex index type is not known");
+ } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex output format - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (classList.get(VALUE_PARAM_INDEX) == null) {
+ LOG.warn("Output format vertex value type is not known");
+ } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
+ "vertex - " + vertexValueType +
+ ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
+ }
+ if (classList.get(EDGE_PARAM_INDEX) == null) {
+ LOG.warn("Output format edge value type is not known");
+ } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
+ }
+ }
+ }
+
+ /** If there is a vertex resolver,
+ * validate the generic parameter types. */
+ private void verifyVertexResolverGenericTypes() {
+ Class<? extends VertexResolver<I, V, E, M>>
+ vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+ if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
+ return;
+ }
+ Class<? extends DefaultVertexResolver<I, V, E, M>>
+ dvrClass =
+ (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(
+ DefaultVertexResolver.class, dvrClass);
+ if (classList.get(ID_PARAM_INDEX) != null &&
+ !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (classList.get(VALUE_PARAM_INDEX) != null &&
+ !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
+ "vertex - " + vertexValueType +
+ ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
+ }
+ if (classList.get(EDGE_PARAM_INDEX) != null &&
+ !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
+ }
+ if (classList.get(MSG_PARAM_INDEX) != null &&
+ !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Message value types don't match, " +
+ "vertex - " + messageValueType +
+ ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/package-info.java b/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
new file mode 100644
index 0000000..c89d6af
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the classes involved in job launch.
+ */
+package org.apache.giraph.job;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 6c7f34d..ae8ebf3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -21,7 +21,7 @@ package org.apache.giraph.utils;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index f43efe9..6aab533 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.utils.FileUtils;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index 87af297..efbe320 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -24,7 +24,7 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index faf4126..0d6d1d0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -35,7 +35,7 @@ import org.apache.giraph.examples.SimpleSuperstepVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.worker.InputSplitPathOrganizer;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.vertex.Vertex;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index ff71b86..2e12bdc 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -23,7 +23,7 @@ import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.partition.HashRangePartitionerFactory;
import org.apache.giraph.partition.PartitionBalancer;
import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index 7d3dde3..eb2338c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -23,7 +23,7 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java b/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
index 7bac9e8..0427b85 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.examples.SimpleMutateGraphVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java b/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
index 7654305..759624b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 40db41f..7deeb42 100644
--- a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -26,7 +26,7 @@ import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.examples.AggregatorsTestVertex;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
index 3909f46..5e61596 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -20,7 +20,7 @@ package org.apache.giraph.examples;
import org.apache.giraph.BspCase;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.partition.HashMasterPartitioner;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index 13f4fe1..e2f6fd3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -21,7 +21,7 @@ import org.apache.giraph.BspCase;
import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
import org.apache.giraph.benchmark.PageRankComputation;
import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
index 185ba50..a0b16e2 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
@@ -19,7 +19,7 @@ package org.apache.giraph.partition;
import org.apache.giraph.graph.DefaultEdge;
import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.graph.GiraphTransferRegulator;
import org.apache.giraph.vertex.EdgeListVertex;
import org.apache.hadoop.io.DoubleWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
index 1a260d5..80187ef 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
@@ -21,7 +21,7 @@ package org.apache.giraph.vertex;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.job.GiraphTypeValidator;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
index cc7d148..fe5b72e 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
@@ -22,7 +22,7 @@ package org.apache.giraph.io.hbase;
import org.apache.giraph.BspCase;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.hbase.edgemarker.TableEdgeInputFormat;
import org.apache.giraph.io.hbase.edgemarker.TableEdgeOutputFormat;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
index 20d13ec..fbcef72 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
@@ -25,7 +25,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;