You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/02/05 23:28:59 UTC

git commit: GIRAPH-500: Refactor job launch code out of graph package and into job package (ereisman)

Updated Branches:
  refs/heads/trunk 01b353334 -> 0c77d52f3


GIRAPH-500: Refactor job launch code out of graph package and into job package (ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0c77d52f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0c77d52f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0c77d52f

Branch: refs/heads/trunk
Commit: 0c77d52f3c7eee138f36eeb831b4afab45216b2f
Parents: 01b3533
Author: Eli Reisman <er...@apache.org>
Authored: Tue Feb 5 14:28:32 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Tue Feb 5 14:28:32 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../io/accumulo/TestAccumuloVertexFormat.java      |    2 +-
 .../main/java/org/apache/giraph/GiraphRunner.java  |    4 +-
 .../giraph/benchmark/AggregatorsBenchmark.java     |    2 +-
 .../apache/giraph/benchmark/PageRankBenchmark.java |    2 +-
 .../giraph/benchmark/RandomMessageBenchmark.java   |    2 +-
 .../giraph/benchmark/ShortestPathsBenchmark.java   |    2 +-
 .../apache/giraph/conf/GiraphConfiguration.java    |    4 +-
 .../conf/ImmutableClassesGiraphConfiguration.java  |    2 +-
 .../giraph/examples/SimpleCheckpointVertex.java    |    2 +-
 .../examples/SimpleVertexWithWorkerContext.java    |    2 +-
 .../apache/giraph/graph/DefaultJobObserver.java    |   52 ---
 .../java/org/apache/giraph/graph/GiraphJob.java    |  287 --------------
 .../org/apache/giraph/graph/GiraphJobObserver.java |   39 --
 .../apache/giraph/graph/GiraphTypeValidator.java   |  280 --------------
 .../org/apache/giraph/job/DefaultJobObserver.java  |   52 +++
 .../main/java/org/apache/giraph/job/GiraphJob.java |  289 +++++++++++++++
 .../org/apache/giraph/job/GiraphJobObserver.java   |   39 ++
 .../org/apache/giraph/job/GiraphTypeValidator.java |  282 ++++++++++++++
 .../java/org/apache/giraph/job/package-info.java   |   21 +
 .../apache/giraph/utils/InternalVertexRunner.java  |    2 +-
 .../src/test/java/org/apache/giraph/BspCase.java   |    2 +-
 .../java/org/apache/giraph/TestAutoCheckpoint.java |    2 +-
 .../test/java/org/apache/giraph/TestBspBasic.java  |    2 +-
 .../org/apache/giraph/TestGraphPartitioner.java    |    2 +-
 .../org/apache/giraph/TestManualCheckpoint.java    |    2 +-
 .../java/org/apache/giraph/TestMutateGraph.java    |    2 +-
 .../org/apache/giraph/TestNotEnoughMapTasks.java   |    2 +-
 .../aggregators/TestAggregatorsHandling.java       |    2 +-
 .../org/apache/giraph/examples/TestPageRank.java   |    2 +-
 .../org/apache/giraph/io/TestJsonBase64Format.java |    2 +-
 .../partition/TestGiraphTransferRegulator.java     |    2 +-
 .../org/apache/giraph/vertex/TestVertexTypes.java  |    2 +-
 .../io/hbase/TestHBaseRootMarkerVertextFormat.java |    2 +-
 .../giraph/io/hcatalog/HiveGiraphRunner.java       |    2 +-
 35 files changed, 712 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cc0ca34..2e25026 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-500: Refactor job launch code out of graph package and into job package (ereisman)
+
   GIRAPH-493: Remove EdgeWithSource (nitay)
 
   GIRAPH-429: Number of input split threads set to 1 less than necessary (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
index e68e18d..8894199 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
@@ -36,7 +36,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
 import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 2e88a83..b6a6113 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -28,8 +28,8 @@ import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.examples.Algorithm;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.job.GiraphTypeValidator;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index f9a0730..a82a9f8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 6e49812..7ebcb93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -25,7 +25,7 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.combiner.DoubleSumCombiner;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
 import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index d48aa6d..d0d80af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 888532d..52bbac4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -26,7 +26,7 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.combiner.MinimumDoubleCombiner;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 79b12d3..b3b9c4b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,8 +20,8 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.DefaultJobObserver;
-import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2513d8f..d75d624 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.master.MasterCompute;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
index ee62c99..337f30e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
@@ -26,7 +26,7 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.master.DefaultMasterCompute;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
index b7605bb..f6488d5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
@@ -21,7 +21,7 @@ package org.apache.giraph.examples;
 import org.apache.giraph.examples.SimpleSuperstepVertex.
     SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
deleted file mode 100644
index 9f0418a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Default implementation of job observer that does nothing.
- */
-public class DefaultJobObserver implements GiraphJobObserver,
-    ImmutableClassesGiraphConfigurable {
-  /** configuration object stored here */
-  private ImmutableClassesGiraphConfiguration conf;
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    this.conf = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  public void launchingJob(Job jobToSubmit) {
-    // do nothing
-  }
-
-  @Override
-  public void jobFinished(Job jobToSubmit, boolean passed) {
-    // do nothing
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
deleted file mode 100644
index 18c0db2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.BspOutputFormat;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-
-/**
- * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
- * Uses composition to avoid unwanted {@link Job} methods from exposure
- * to the user.
- */
-public class GiraphJob {
-  static {
-    Configuration.addDefaultResource("giraph-site.xml");
-  }
-
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-  /** Internal delegated job to proxy interface requests for Job */
-  private final DelegatedJob delegatedJob;
-  /** Name of the job */
-  private final String jobName;
-  /** Helper configuration from the job */
-  private final GiraphConfiguration giraphConfiguration;
-
-  /**
-   * Delegated job that simply passes along the class GiraphConfiguration.
-   */
-  private class DelegatedJob extends Job {
-    /** Ensure that for job initiation the super.getConfiguration() is used */
-    private boolean jobInited = false;
-
-    /**
-     * Constructor
-     *
-     * @throws IOException
-     */
-    DelegatedJob() throws IOException { }
-
-    @Override
-    public Configuration getConfiguration() {
-      if (jobInited) {
-        return giraphConfiguration;
-      } else {
-        return super.getConfiguration();
-      }
-    }
-  }
-
-  /**
-   * Constructor that will instantiate the configuration
-   *
-   * @param jobName User-defined job name
-   * @throws IOException
-   */
-  public GiraphJob(String jobName) throws IOException {
-    this(new GiraphConfiguration(), jobName);
-  }
-
-  /**
-   * Constructor.
-   *
-   * @param configuration User-defined configuration
-   * @param jobName User-defined job name
-   * @throws IOException
-   */
-  public GiraphJob(Configuration configuration,
-                   String jobName) throws IOException {
-    this(new GiraphConfiguration(configuration), jobName);
-  }
-
-  /**
-   * Constructor.
-   *
-   * @param giraphConfiguration User-defined configuration
-   * @param jobName User-defined job name
-   * @throws IOException
-   */
-  public GiraphJob(GiraphConfiguration giraphConfiguration,
-                   String jobName) throws IOException {
-    this.jobName = jobName;
-    this.giraphConfiguration = giraphConfiguration;
-    this.delegatedJob = new DelegatedJob();
-  }
-
-  /**
-   * Get the configuration from the internal job.
-   *
-   * @return Configuration used by the job.
-   */
-  public GiraphConfiguration getConfiguration() {
-    return giraphConfiguration;
-  }
-
-  /**
-   * Be very cautious when using this method as it returns the internal job
-   * of {@link GiraphJob}.  This should only be used for methods that require
-   * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
-   *
-   * @return Internal job that will actually be submitted to Hadoop.
-   */
-  public Job getInternalJob() {
-    delegatedJob.jobInited = true;
-    return delegatedJob;
-  }
-  /**
-   * Make sure the configuration is set properly by the user prior to
-   * submitting the job.
-   *
-   * @param conf Configuration to check
-   */
-  private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
-    if (conf.getMaxWorkers() < 0) {
-      throw new RuntimeException("checkConfiguration: No valid " +
-          GiraphConstants.MAX_WORKERS);
-    }
-    if (conf.getMinPercentResponded() <= 0.0f ||
-        conf.getMinPercentResponded() > 100.0f) {
-      throw new IllegalArgumentException(
-          "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
-              " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
-    }
-    if (conf.getMinWorkers() < 0) {
-      throw new IllegalArgumentException("checkConfiguration: No valid " +
-          GiraphConstants.MIN_WORKERS);
-    }
-    if (conf.getVertexClass() == null) {
-      throw new IllegalArgumentException("checkConfiguration: Null" +
-          GiraphConstants.VERTEX_CLASS);
-    }
-    if (conf.getVertexInputFormatClass() == null &&
-        conf.getEdgeInputFormatClass() == null) {
-      throw new IllegalArgumentException("checkConfiguration: One of " +
-          GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
-          GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
-    }
-    if (conf.getEdgeInputFormatClass() != null &&
-        !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
-      throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
-          " only works with mutable vertices");
-    }
-    if (conf.getVertexResolverClass() == null) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("checkConfiguration: No class found for " +
-            GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
-            DefaultVertexResolver.class.getCanonicalName());
-      }
-    }
-  }
-
-
-  /**
-   * Check if the configuration is local.  If it is local, do additional
-   * checks due to the restrictions of LocalJobRunner.
-   *
-   * @param conf Configuration
-   */
-  private static void checkLocalJobRunnerConfiguration(
-      GiraphConfiguration conf) {
-    String jobTracker = conf.get("mapred.job.tracker", null);
-    if (!jobTracker.equals("local")) {
-      // Nothing to check
-      return;
-    }
-
-    int maxWorkers = conf.getMaxWorkers();
-    if (maxWorkers != 1) {
-      throw new IllegalArgumentException(
-          "checkLocalJobRunnerConfiguration: When using " +
-              "LocalJobRunner, must have only one worker since " +
-          "only 1 task at a time!");
-    }
-    if (conf.getSplitMasterWorker()) {
-      throw new IllegalArgumentException(
-          "checkLocalJobRunnerConfiguration: When using " +
-              "LocalJobRunner, you cannot run in split master / worker " +
-          "mode since there is only 1 task at a time!");
-    }
-  }
-
-  /**
-   * Check whether a specified int conf value is set and if not, set it.
-   *
-   * @param param Conf value to check
-   * @param defaultValue Assign to value if not set
-   */
-  private void setIntConfIfDefault(String param, int defaultValue) {
-    if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
-        Integer.MIN_VALUE) {
-      giraphConfiguration.setInt(param, defaultValue);
-    }
-  }
-
-  /**
-   * Runs the actual graph application through Hadoop Map-Reduce.
-   *
-   * @param verbose If true, provide verbose output, false otherwise
-   * @return True if success, false otherwise
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public final boolean run(boolean verbose)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    // Most users won't hit this hopefully and can set it higher if desired
-    setIntConfIfDefault("mapreduce.job.counters.limit", 512);
-
-    // Capacity scheduler-specific settings.  These should be enough for
-    // a reasonable Giraph job
-    setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
-    setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
-
-    // Speculative execution doesn't make sense for Giraph
-    giraphConfiguration.setBoolean(
-        "mapred.map.tasks.speculative.execution", false);
-
-    // Set the ping interval to 5 minutes instead of one minute
-    // (DEFAULT_PING_INTERVAL)
-    Client.setPingInterval(giraphConfiguration, 60000 * 5);
-
-    // Should work in MAPREDUCE-1938 to let the user jars/classes
-    // get loaded first
-    giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
-    giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
-
-    // If the checkpoint frequency is 0 (no failure handling), set the max
-    // tasks attempts to be 0 to encourage faster failure of unrecoverable jobs
-    if (giraphConfiguration.getCheckpointFrequency() == 0) {
-      int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
-      giraphConfiguration.setMaxTaskAttempts(0);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("run: Since checkpointing is disabled (default), " +
-            "do not allow any task retries (setting " +
-            GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
-            "old value = " + oldMaxTaskAttempts + ")");
-      }
-    }
-
-    // Set the job properties, check them, and submit the job
-    ImmutableClassesGiraphConfiguration conf =
-        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
-    checkConfiguration(conf);
-    checkLocalJobRunnerConfiguration(conf);
-    Job submittedJob = new Job(conf, jobName);
-    if (submittedJob.getJar() == null) {
-      submittedJob.setJarByClass(getClass());
-    }
-    submittedJob.setNumReduceTasks(0);
-    submittedJob.setMapperClass(GraphMapper.class);
-    submittedJob.setInputFormatClass(BspInputFormat.class);
-    submittedJob.setOutputFormatClass(BspOutputFormat.class);
-
-    GiraphJobObserver jobObserver = conf.getJobObserver();
-    jobObserver.launchingJob(submittedJob);
-    boolean passed = submittedJob.waitForCompletion(verbose);
-    jobObserver.jobFinished(submittedJob, passed);
-
-    return passed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
deleted file mode 100644
index 7aec8c2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * An observer over the job launch lifecycle.
- */
-public interface GiraphJobObserver {
-  /**
-   * Callback for job about to start.
-   * @param jobToSubmit Job we're going to submit to hadoop.
-   */
-  void launchingJob(Job jobToSubmit);
-
-  /**
-   * Callback when job finishes.
-   * @param submittedJob Job that ran in hadoop.
-   * @param passed true if job succeeded.
-   */
-  void jobFinished(Job submittedJob, boolean passed);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
deleted file mode 100644
index 213a69a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.BspUtils;
-import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * GiraphTypeValidator attempts to verify the consistency of
- * user-chosen InputFormat, OutputFormat, and Vertex type
- * parameters before the job run actually begins.
- *
- * @param <I> the Vertex ID type
- * @param <V> the Vertex Value type
- * @param <E> the Edge Value type
- * @param <M> the Message type
- */
-public class GiraphTypeValidator<I extends WritableComparable,
-  V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Class logger object.
-   */
-  private static Logger LOG =
-    Logger.getLogger(GiraphTypeValidator.class);
-
-  /** I param vertex index in classList */
-  private static final int ID_PARAM_INDEX = 0;
-  /** V param vertex index in classList */
-  private static final int VALUE_PARAM_INDEX = 1;
-  /** E param vertex index in classList */
-  private static final int EDGE_PARAM_INDEX = 2;
-  /** M param vertex index in classList */
-  private static final int MSG_PARAM_INDEX = 3;
-  /** M param vertex combiner index in classList */
-  private static final int MSG_COMBINER_PARAM_INDEX = 1;
-  /** E param edge input format index in classList */
-  private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
-
-  /** Vertex Index Type */
-  private Type vertexIndexType;
-  /** Vertex Index Type */
-  private Type vertexValueType;
-  /** Vertex Index Type */
-  private Type edgeValueType;
-  /** Vertex Index Type */
-  private Type messageValueType;
-
-  /**
-   * The Configuration object for use in the validation test.
-   */
-  private Configuration conf;
-
-  /**
-   * Constructor to execute the validation test, throws
-   * unchecked exception to end job run on failure.
-   *
-   * @param conf the Configuration for this run.
-   */
-  public GiraphTypeValidator(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Make sure that all registered classes have matching types.  This
-   * is a little tricky due to type erasure, cannot simply get them from
-   * the class type arguments.  Also, set the vertex index, vertex value,
-   * edge value and message value classes.
-   */
-  public void validateClassTypes() {
-    Class<? extends Vertex<I, V, E, M>> vertexClass =
-      BspUtils.<I, V, E, M>getVertexClass(conf);
-    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
-      Vertex.class, vertexClass);
-    vertexIndexType = classList.get(ID_PARAM_INDEX);
-    vertexValueType = classList.get(VALUE_PARAM_INDEX);
-    edgeValueType = classList.get(EDGE_PARAM_INDEX);
-    messageValueType = classList.get(MSG_PARAM_INDEX);
-    verifyVertexInputFormatGenericTypes();
-    verifyEdgeInputFormatGenericTypes();
-    verifyVertexOutputFormatGenericTypes();
-    verifyVertexResolverGenericTypes();
-    verifyVertexCombinerGenericTypes();
-  }
-
-  /** Verify matching generic types in VertexInputFormat. */
-  private void verifyVertexInputFormatGenericTypes() {
-    Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
-      BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
-    if (vertexInputFormatClass != null) {
-      List<Class<?>> classList =
-          ReflectionUtils.getTypeArguments(
-              VertexInputFormat.class, vertexInputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex index types don't match, " +
-                "vertex - " + vertexIndexType +
-                ", vertex input format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(VALUE_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex value type is not known");
-      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex value types don't match, " +
-                "vertex - " + vertexValueType +
-                ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_INDEX) == null) {
-        LOG.warn("Input format edge value type is not known");
-      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
-      }
-    }
-  }
-
-  /** Verify matching generic types in EdgeInputFormat. */
-  private void verifyEdgeInputFormatGenericTypes() {
-    Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
-        BspUtils.<I, E>getEdgeInputFormatClass(conf);
-    if (edgeInputFormatClass != null) {
-      List<Class<?>> classList =
-          ReflectionUtils.getTypeArguments(
-              EdgeInputFormat.class, edgeInputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex index types don't match, " +
-                "vertex - " + vertexIndexType +
-                ", edge input format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
-        LOG.warn("Input format edge value type is not known");
-      } else if (!edgeValueType.equals(
-          classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", edge input format - " +
-                classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
-      }
-    }
-  }
-
-  /** If there is a combiner type, verify its generic params match the job. */
-  private void verifyVertexCombinerGenericTypes() {
-    Class<? extends Combiner<I, M>> vertexCombinerClass =
-      BspUtils.<I, M>getCombinerClass(conf);
-    if (vertexCombinerClass != null) {
-      List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(
-          Combiner.class, vertexCombinerClass);
-      if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex index types don't match, " +
-            "vertex - " + vertexIndexType +
-            ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Message value types don't match, " +
-            "vertex - " + messageValueType +
-            ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
-      }
-    }
-  }
-
-  /** Verify that the output format's generic params match the job. */
-  private void verifyVertexOutputFormatGenericTypes() {
-    Class<? extends VertexOutputFormat<I, V, E>>
-      vertexOutputFormatClass =
-      BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
-    if (vertexOutputFormatClass != null) {
-      List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(
-          VertexOutputFormat.class, vertexOutputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Output format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex index types don't match, " +
-            "vertex - " + vertexIndexType +
-            ", vertex output format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(VALUE_PARAM_INDEX) == null) {
-        LOG.warn("Output format vertex value type is not known");
-      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex value types don't match, " +
-            "vertex - " + vertexValueType +
-            ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_INDEX) == null) {
-        LOG.warn("Output format edge value type is not known");
-      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Edge value types don't match, " +
-            "vertex - " + edgeValueType +
-            ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
-      }
-    }
-  }
-
-  /** If there is a vertex resolver,
-   * validate the generic parameter types. */
-  private void verifyVertexResolverGenericTypes() {
-    Class<? extends VertexResolver<I, V, E, M>>
-      vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
-    if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
-      return;
-    }
-    Class<? extends DefaultVertexResolver<I, V, E, M>>
-      dvrClass =
-        (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
-    List<Class<?>> classList =
-      ReflectionUtils.getTypeArguments(
-          DefaultVertexResolver.class, dvrClass);
-    if (classList.get(ID_PARAM_INDEX) != null &&
-      !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Vertex index types don't match, " +
-          "vertex - " + vertexIndexType +
-          ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
-    }
-    if (classList.get(VALUE_PARAM_INDEX) != null &&
-      !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Vertex value types don't match, " +
-          "vertex - " + vertexValueType +
-          ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
-    }
-    if (classList.get(EDGE_PARAM_INDEX) != null &&
-      !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Edge value types don't match, " +
-          "vertex - " + edgeValueType +
-          ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
-    }
-    if (classList.get(MSG_PARAM_INDEX) != null &&
-      !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Message value types don't match, " +
-          "vertex - " + messageValueType +
-          ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
new file mode 100644
index 0000000..2e703ca
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Default implementation of job observer that does nothing.
+ */
+public class DefaultJobObserver implements GiraphJobObserver,
+    ImmutableClassesGiraphConfigurable {
+  /** configuration object stored here */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void launchingJob(Job jobToSubmit) {
+    // do nothing
+  }
+
+  @Override
+  public void jobFinished(Job jobToSubmit, boolean passed) {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
new file mode 100644
index 0000000..62498c6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.vertex.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
+ * Uses composition to avoid unwanted {@link Job} methods from exposure
+ * to the user.
+ */
+public class GiraphJob {
+  static {
+    Configuration.addDefaultResource("giraph-site.xml");
+  }
+
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphJob.class);
+  /** Internal delegated job to proxy interface requests for Job */
+  private final DelegatedJob delegatedJob;
+  /** Name of the job */
+  private final String jobName;
+  /** Helper configuration from the job */
+  private final GiraphConfiguration giraphConfiguration;
+
+  /**
+   * Delegated job that simply passes along the class GiraphConfiguration.
+   */
+  private class DelegatedJob extends Job {
+    /** Ensure that for job initiation the super.getConfiguration() is used */
+    private boolean jobInited = false;
+
+    /**
+     * Constructor
+     *
+     * @throws IOException
+     */
+    DelegatedJob() throws IOException { }
+
+    @Override
+    public Configuration getConfiguration() {
+      if (jobInited) {
+        return giraphConfiguration;
+      } else {
+        return super.getConfiguration();
+      }
+    }
+  }
+
+  /**
+   * Constructor that will instantiate the configuration
+   *
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(String jobName) throws IOException {
+    this(new GiraphConfiguration(), jobName);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param configuration User-defined configuration
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(Configuration configuration,
+                   String jobName) throws IOException {
+    this(new GiraphConfiguration(configuration), jobName);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param giraphConfiguration User-defined configuration
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(GiraphConfiguration giraphConfiguration,
+                   String jobName) throws IOException {
+    this.jobName = jobName;
+    this.giraphConfiguration = giraphConfiguration;
+    this.delegatedJob = new DelegatedJob();
+  }
+
+  /**
+   * Get the configuration from the internal job.
+   *
+   * @return Configuration used by the job.
+   */
+  public GiraphConfiguration getConfiguration() {
+    return giraphConfiguration;
+  }
+
+  /**
+   * Be very cautious when using this method as it returns the internal job
+   * of {@link GiraphJob}.  This should only be used for methods that require
+   * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
+   *
+   * @return Internal job that will actually be submitted to Hadoop.
+   */
+  public Job getInternalJob() {
+    delegatedJob.jobInited = true;
+    return delegatedJob;
+  }
+  /**
+   * Make sure the configuration is set properly by the user prior to
+   * submitting the job.
+   *
+   * @param conf Configuration to check
+   */
+  private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+    if (conf.getMaxWorkers() < 0) {
+      throw new RuntimeException("checkConfiguration: No valid " +
+          GiraphConstants.MAX_WORKERS);
+    }
+    if (conf.getMinPercentResponded() <= 0.0f ||
+        conf.getMinPercentResponded() > 100.0f) {
+      throw new IllegalArgumentException(
+          "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
+              " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
+    }
+    if (conf.getMinWorkers() < 0) {
+      throw new IllegalArgumentException("checkConfiguration: No valid " +
+          GiraphConstants.MIN_WORKERS);
+    }
+    if (conf.getVertexClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: Null" +
+          GiraphConstants.VERTEX_CLASS);
+    }
+    if (conf.getVertexInputFormatClass() == null &&
+        conf.getEdgeInputFormatClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: One of " +
+          GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
+          GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+    }
+    if (conf.getEdgeInputFormatClass() != null &&
+        !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
+      throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
+          " only works with mutable vertices");
+    }
+    if (conf.getVertexResolverClass() == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkConfiguration: No class found for " +
+            GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
+            DefaultVertexResolver.class.getCanonicalName());
+      }
+    }
+  }
+
+
+  /**
+   * Check if the configuration is local.  If it is local, do additional
+   * checks due to the restrictions of LocalJobRunner.
+   *
+   * @param conf Configuration
+   */
+  private static void checkLocalJobRunnerConfiguration(
+      GiraphConfiguration conf) {
+    String jobTracker = conf.get("mapred.job.tracker", null);
+    if (!jobTracker.equals("local")) {
+      // Nothing to check
+      return;
+    }
+
+    int maxWorkers = conf.getMaxWorkers();
+    if (maxWorkers != 1) {
+      throw new IllegalArgumentException(
+          "checkLocalJobRunnerConfiguration: When using " +
+              "LocalJobRunner, must have only one worker since " +
+          "only 1 task at a time!");
+    }
+    if (conf.getSplitMasterWorker()) {
+      throw new IllegalArgumentException(
+          "checkLocalJobRunnerConfiguration: When using " +
+              "LocalJobRunner, you cannot run in split master / worker " +
+          "mode since there is only 1 task at a time!");
+    }
+  }
+
+  /**
+   * Check whether a specified int conf value is set and if not, set it.
+   *
+   * @param param Conf value to check
+   * @param defaultValue Assign to value if not set
+   */
+  private void setIntConfIfDefault(String param, int defaultValue) {
+    if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
+        Integer.MIN_VALUE) {
+      giraphConfiguration.setInt(param, defaultValue);
+    }
+  }
+
+  /**
+   * Runs the actual graph application through Hadoop Map-Reduce.
+   *
+   * @param verbose If true, provide verbose output, false otherwise
+   * @return True if success, false otherwise
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public final boolean run(boolean verbose)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    // Most users won't hit this hopefully and can set it higher if desired
+    setIntConfIfDefault("mapreduce.job.counters.limit", 512);
+
+    // Capacity scheduler-specific settings.  These should be enough for
+    // a reasonable Giraph job
+    setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
+    setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
+
+    // Speculative execution doesn't make sense for Giraph
+    giraphConfiguration.setBoolean(
+        "mapred.map.tasks.speculative.execution", false);
+
+    // Set the ping interval to 5 minutes instead of one minute
+    // (DEFAULT_PING_INTERVAL)
+    Client.setPingInterval(giraphConfiguration, 60000 * 5);
+
+    // Should work in MAPREDUCE-1938 to let the user jars/classes
+    // get loaded first
+    giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
+    giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
+
+    // If the checkpoint frequency is 0 (no failure handling), set the max
+    // tasks attempts to be 0 to encourage faster failure of unrecoverable jobs
+    if (giraphConfiguration.getCheckpointFrequency() == 0) {
+      int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
+      giraphConfiguration.setMaxTaskAttempts(0);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("run: Since checkpointing is disabled (default), " +
+            "do not allow any task retries (setting " +
+            GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
+            "old value = " + oldMaxTaskAttempts + ")");
+      }
+    }
+
+    // Set the job properties, check them, and submit the job
+    ImmutableClassesGiraphConfiguration conf =
+        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+    checkConfiguration(conf);
+    checkLocalJobRunnerConfiguration(conf);
+    Job submittedJob = new Job(conf, jobName);
+    if (submittedJob.getJar() == null) {
+      submittedJob.setJarByClass(getClass());
+    }
+    submittedJob.setNumReduceTasks(0);
+    submittedJob.setMapperClass(GraphMapper.class);
+    submittedJob.setInputFormatClass(BspInputFormat.class);
+    submittedJob.setOutputFormatClass(BspOutputFormat.class);
+
+    GiraphJobObserver jobObserver = conf.getJobObserver();
+    jobObserver.launchingJob(submittedJob);
+    boolean passed = submittedJob.waitForCompletion(verbose);
+    jobObserver.jobFinished(submittedJob, passed);
+
+    return passed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
new file mode 100644
index 0000000..fbcc4f1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * An observer over the job launch lifecycle.
+ */
+public interface GiraphJobObserver {
+  /**
+   * Callback for job about to start.
+   * @param jobToSubmit Job we're going to submit to hadoop.
+   */
+  void launchingJob(Job jobToSubmit);
+
+  /**
+   * Callback when job finishes.
+   * @param submittedJob Job that ran in hadoop.
+   * @param passed true if job succeeded.
+   */
+  void jobFinished(Job submittedJob, boolean passed);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
new file mode 100644
index 0000000..de2a66d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphTypeValidator.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspUtils;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * GiraphTypeValidator attempts to verify the consistency of
+ * user-chosen InputFormat, OutputFormat, and Vertex type
+ * parameters before the job run actually begins.
+ *
+ * @param <I> the Vertex ID type
+ * @param <V> the Vertex Value type
+ * @param <E> the Edge Value type
+ * @param <M> the Message type
+ */
+public class GiraphTypeValidator<I extends WritableComparable,
+  V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Class logger object.
+   */
+  private static Logger LOG =
+    Logger.getLogger(GiraphTypeValidator.class);
+
+  /** I param vertex index in classList */
+  private static final int ID_PARAM_INDEX = 0;
+  /** V param vertex index in classList */
+  private static final int VALUE_PARAM_INDEX = 1;
+  /** E param vertex index in classList */
+  private static final int EDGE_PARAM_INDEX = 2;
+  /** M param vertex index in classList */
+  private static final int MSG_PARAM_INDEX = 3;
+  /** M param vertex combiner index in classList */
+  private static final int MSG_COMBINER_PARAM_INDEX = 1;
+  /** E param edge input format index in classList */
+  private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
+
+  /** Vertex Index Type */
+  private Type vertexIndexType;
+  /** Vertex Index Type */
+  private Type vertexValueType;
+  /** Vertex Index Type */
+  private Type edgeValueType;
+  /** Vertex Index Type */
+  private Type messageValueType;
+
+  /**
+   * The Configuration object for use in the validation test.
+   */
+  private Configuration conf;
+
+  /**
+   * Constructor to execute the validation test, throws
+   * unchecked exception to end job run on failure.
+   *
+   * @param conf the Configuration for this run.
+   */
+  public GiraphTypeValidator(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Make sure that all registered classes have matching types.  This
+   * is a little tricky due to type erasure, cannot simply get them from
+   * the class type arguments.  Also, set the vertex index, vertex value,
+   * edge value and message value classes.
+   */
+  public void validateClassTypes() {
+    Class<? extends Vertex<I, V, E, M>> vertexClass =
+      BspUtils.<I, V, E, M>getVertexClass(conf);
+    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
+      Vertex.class, vertexClass);
+    vertexIndexType = classList.get(ID_PARAM_INDEX);
+    vertexValueType = classList.get(VALUE_PARAM_INDEX);
+    edgeValueType = classList.get(EDGE_PARAM_INDEX);
+    messageValueType = classList.get(MSG_PARAM_INDEX);
+    verifyVertexInputFormatGenericTypes();
+    verifyEdgeInputFormatGenericTypes();
+    verifyVertexOutputFormatGenericTypes();
+    verifyVertexResolverGenericTypes();
+    verifyVertexCombinerGenericTypes();
+  }
+
+  /** Verify matching generic types in VertexInputFormat. */
+  private void verifyVertexInputFormatGenericTypes() {
+    Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+      BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+    if (vertexInputFormatClass != null) {
+      List<Class<?>> classList =
+          ReflectionUtils.getTypeArguments(
+              VertexInputFormat.class, vertexInputFormatClass);
+      if (classList.get(ID_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
+                "vertex - " + vertexIndexType +
+                ", vertex input format - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (classList.get(VALUE_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex value type is not known");
+      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex value types don't match, " +
+                "vertex - " + vertexValueType +
+                ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
+      }
+      if (classList.get(EDGE_PARAM_INDEX) == null) {
+        LOG.warn("Input format edge value type is not known");
+      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Edge value types don't match, " +
+                "vertex - " + edgeValueType +
+                ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
+      }
+    }
+  }
+
+  /** Verify matching generic types in EdgeInputFormat. */
+  private void verifyEdgeInputFormatGenericTypes() {
+    Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+        BspUtils.<I, E>getEdgeInputFormatClass(conf);
+    if (edgeInputFormatClass != null) {
+      List<Class<?>> classList =
+          ReflectionUtils.getTypeArguments(
+              EdgeInputFormat.class, edgeInputFormatClass);
+      if (classList.get(ID_PARAM_INDEX) == null) {
+        LOG.warn("Input format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
+                "vertex - " + vertexIndexType +
+                ", edge input format - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
+        LOG.warn("Input format edge value type is not known");
+      } else if (!edgeValueType.equals(
+          classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Edge value types don't match, " +
+                "vertex - " + edgeValueType +
+                ", edge input format - " +
+                classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
+      }
+    }
+  }
+
+  /** If there is a combiner type, verify its generic params match the job. */
+  private void verifyVertexCombinerGenericTypes() {
+    Class<? extends Combiner<I, M>> vertexCombinerClass =
+      BspUtils.<I, M>getCombinerClass(conf);
+    if (vertexCombinerClass != null) {
+      List<Class<?>> classList =
+        ReflectionUtils.getTypeArguments(
+          Combiner.class, vertexCombinerClass);
+      if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+          "checkClassTypes: Vertex index types don't match, " +
+            "vertex - " + vertexIndexType +
+            ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+          "checkClassTypes: Message value types don't match, " +
+            "vertex - " + messageValueType +
+            ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
+      }
+    }
+  }
+
+  /** Verify that the output format's generic params match the job. */
+  private void verifyVertexOutputFormatGenericTypes() {
+    Class<? extends VertexOutputFormat<I, V, E>>
+      vertexOutputFormatClass =
+      BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+    if (vertexOutputFormatClass != null) {
+      List<Class<?>> classList =
+        ReflectionUtils.getTypeArguments(
+          VertexOutputFormat.class, vertexOutputFormatClass);
+      if (classList.get(ID_PARAM_INDEX) == null) {
+        LOG.warn("Output format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+          "checkClassTypes: Vertex index types don't match, " +
+            "vertex - " + vertexIndexType +
+            ", vertex output format - " + classList.get(ID_PARAM_INDEX));
+      }
+      if (classList.get(VALUE_PARAM_INDEX) == null) {
+        LOG.warn("Output format vertex value type is not known");
+      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+          "checkClassTypes: Vertex value types don't match, " +
+            "vertex - " + vertexValueType +
+            ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
+      }
+      if (classList.get(EDGE_PARAM_INDEX) == null) {
+        LOG.warn("Output format edge value type is not known");
+      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+        throw new IllegalArgumentException(
+          "checkClassTypes: Edge value types don't match, " +
+            "vertex - " + edgeValueType +
+            ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
+      }
+    }
+  }
+
+  /** If there is a vertex resolver,
+   * validate the generic parameter types. */
+  private void verifyVertexResolverGenericTypes() {
+    Class<? extends VertexResolver<I, V, E, M>>
+      vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+    if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
+      return;
+    }
+    Class<? extends DefaultVertexResolver<I, V, E, M>>
+      dvrClass =
+        (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
+    List<Class<?>> classList =
+      ReflectionUtils.getTypeArguments(
+          DefaultVertexResolver.class, dvrClass);
+    if (classList.get(ID_PARAM_INDEX) != null &&
+      !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+      throw new IllegalArgumentException(
+        "checkClassTypes: Vertex index types don't match, " +
+          "vertex - " + vertexIndexType +
+          ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
+    }
+    if (classList.get(VALUE_PARAM_INDEX) != null &&
+      !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+      throw new IllegalArgumentException(
+        "checkClassTypes: Vertex value types don't match, " +
+          "vertex - " + vertexValueType +
+          ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
+    }
+    if (classList.get(EDGE_PARAM_INDEX) != null &&
+      !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+      throw new IllegalArgumentException(
+        "checkClassTypes: Edge value types don't match, " +
+          "vertex - " + edgeValueType +
+          ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
+    }
+    if (classList.get(MSG_PARAM_INDEX) != null &&
+      !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
+      throw new IllegalArgumentException(
+        "checkClassTypes: Message value types don't match, " +
+          "vertex - " + messageValueType +
+          ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/package-info.java b/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
new file mode 100644
index 0000000..c89d6af
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the classes involved in job launch.
+ */
+package org.apache.giraph.job;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 6c7f34d..ae8ebf3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -21,7 +21,7 @@ package org.apache.giraph.utils;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index f43efe9..6aab533 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.utils.FileUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index 87af297..efbe320 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -24,7 +24,7 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index faf4126..0d6d1d0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -35,7 +35,7 @@ import org.apache.giraph.examples.SimpleSuperstepVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.worker.InputSplitPathOrganizer;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.vertex.Vertex;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index ff71b86..2e12bdc 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -23,7 +23,7 @@ import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.partition.HashRangePartitionerFactory;
 import org.apache.giraph.partition.PartitionBalancer;
 import org.apache.giraph.integration.SuperstepHashPartitionerFactory;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index 7d3dde3..eb2338c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -23,7 +23,7 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java b/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
index 7bac9e8..0427b85 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestMutateGraph.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.examples.SimpleMutateGraphVertex;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java b/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
index 7654305..759624b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
@@ -22,7 +22,7 @@ import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 40db41f..7deeb42 100644
--- a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -26,7 +26,7 @@ import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.examples.AggregatorsTestVertex;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
index 3909f46..5e61596 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -20,7 +20,7 @@ package org.apache.giraph.examples;
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.partition.HashMasterPartitioner;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index 13f4fe1..e2f6fd3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -21,7 +21,7 @@ import org.apache.giraph.BspCase;
 import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
 import org.apache.giraph.benchmark.PageRankComputation;
 import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
 import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
index 185ba50..a0b16e2 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
@@ -19,7 +19,7 @@ package org.apache.giraph.partition;
 
 import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.graph.GiraphTransferRegulator;
 import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.hadoop.io.DoubleWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
index 1a260d5..80187ef 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
@@ -21,7 +21,7 @@ package org.apache.giraph.vertex;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.job.GiraphTypeValidator;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
index cc7d148..fe5b72e 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
@@ -22,7 +22,7 @@ package org.apache.giraph.io.hbase;
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.hbase.edgemarker.TableEdgeInputFormat;
 import org.apache.giraph.io.hbase.edgemarker.TableEdgeOutputFormat;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/0c77d52f/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
index 20d13ec..fbcef72 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
@@ -25,7 +25,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;