You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC

svn commit: r1390014 [1/4] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/...

Author: aching
Date: Tue Sep 25 17:40:18 2012
New Revision: 1390014

URL: http://svn.apache.org/viewvc?rev=1390014&view=rev
Log:
GIRAPH-337: Make a specific Giraph configuration for Class caching and
specific Giraph configuration

Added:
    giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
      - copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java
      - copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java
      - copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
    giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
    giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
    giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
    giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
    giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
    giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
    giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
    giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/test/java/zk/TestZooKeeperManager.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Sep 25 17:40:18 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-337: Make a specific Giraph configuration for Class caching
+  and specific Giraph configuration. (aching)
+
   GIRAPH-334: Bugfix HCatalog Hive profile. (nitayj via aching)
 
   GIRAPH-93: Hive input / output format. (nitayj via aching)

Copied: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?p2=giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java&p1=giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Tue Sep 25 17:40:18 2012
@@ -16,28 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.giraph.graph;
+package org.apache.giraph;
 
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
 
 /**
- * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
- * Uses composition to avoid unwanted {@link Job} methods from exposure
- * to the user.
+ * Adds user methods specific to Giraph.  This will be put into an
+ * ImmutableClassesGiraphConfiguration that provides the configuration plus
+ * the immutable classes.
  */
-public class GiraphJob {
-  static {
-    Configuration.addDefaultResource("giraph-site.xml");
-  }
-
+public class GiraphConfiguration extends Configuration {
   /** Vertex class - required */
   public static final String VERTEX_CLASS = "giraph.vertexClass";
   /** VertexInputFormat class - required */
@@ -172,6 +169,71 @@ public class GiraphJob {
    */
   public static final int TCP_BACKLOG_DEFAULT = 1;
 
+  /** How big to make the default buffer? */
+  public static final String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+      "giraph.nettyRequestEncoderBufferSize";
+  /** Start with 32K */
+  public static final int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT =
+      32 * 1024;
+
+  /** Netty client threads */
+  public static final String NETTY_CLIENT_THREADS =
+      "giraph.nettyClientThreads";
+  /** Default is 4 */
+  public static final int NETTY_CLIENT_THREADS_DEFAULT = 4;
+
+  /** Netty server threads */
+  public static final String NETTY_SERVER_THREADS =
+      "giraph.nettyServerThreads";
+  /** Default is 16 */
+  public static final int NETTY_SERVER_THREADS_DEFAULT = 16;
+
+  /** Use the execution handler in netty on the client? */
+  public static final String NETTY_CLIENT_USE_EXECUTION_HANDLER =
+      "giraph.nettyClientUseExecutionHandler";
+  /** Use the execution handler in netty on the client - default true */
+  public static final boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT =
+      true;
+
+  /** Netty client execution threads (execution handler) */
+  public static final String NETTY_CLIENT_EXECUTION_THREADS =
+      "giraph.nettyClientExecutionThreads";
+  /** Default Netty client execution threads (execution handler) of 8 */
+  public static final int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+
+  /** Where to place the netty client execution handle? */
+  public static final String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+      "giraph.nettyClientExecutionAfterHandler";
+  /**
+   * Default is to use the netty client execution handle after the request
+   * encoder.
+   */
+  public static final String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT =
+      "requestEncoder";
+
+  /** Use the execution handler in netty on the server? */
+  public static final String NETTY_SERVER_USE_EXECUTION_HANDLER =
+      "giraph.nettyServerUseExecutionHandler";
+  /** Use the execution handler in netty on the server - default true */
+  public static final boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT =
+      true;
+
+  /** Netty server execution threads (execution handler) */
+  public static final String NETTY_SERVER_EXECUTION_THREADS =
+      "giraph.nettyServerExecutionThreads";
+  /** Default Netty server execution threads (execution handler) of 8 */
+  public static final int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+
+  /** Where to place the netty server execution handle? */
+  public static final String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+      "giraph.nettyServerExecutionAfterHandler";
+  /**
+   * Default is to use the netty server execution handle after the request
+   * frame decoder.
+   */
+  public static final String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT =
+      "requestFrameDecoder";
+
   /** Netty simulate a first request closed */
   public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
       "giraph.nettySimulateFirstRequestClosed";
@@ -483,92 +545,18 @@ public class GiraphJob {
   /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
   public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
 
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-
-  /** Internal job that actually is submitted */
-  private final Job job;
-  /** Helper configuration from the job */
-  private final Configuration conf;
-
-
   /**
-   * Constructor that will instantiate the configuration
-   *
-   * @param jobName User-defined job name
-   * @throws IOException
+   * Constructor that creates the configuration
    */
-  public GiraphJob(String jobName) throws IOException {
-    this(new Configuration(), jobName);
-  }
+  public GiraphConfiguration() { }
 
   /**
    * Constructor.
    *
-   * @param conf User-defined configuration
-   * @param jobName User-defined job name
-   * @throws IOException
-   */
-  public GiraphJob(Configuration conf, String jobName) throws IOException {
-    job = new Job(conf, jobName);
-    this.conf = job.getConfiguration();
-  }
-
-  /**
-   * Get the configuration from the internal job.
-   *
-   * @return Configuration used by the job.
-   */
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  /**
-   * Be very cautious when using this method as it returns the internal job
-   * of {@link GiraphJob}.  This should only be used for methods that require
-   * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
-   *
-   * @return Internal job that will actually be submitted to Hadoop.
-   */
-  public Job getInternalJob() {
-    return job;
-  }
-  /**
-   * Make sure the configuration is set properly by the user prior to
-   * submitting the job.
+   * @param conf Configuration
    */
-  private void checkConfiguration() {
-    if (conf.getInt(MAX_WORKERS, -1) < 0) {
-      throw new RuntimeException("No valid " + MAX_WORKERS);
-    }
-    if (conf.getFloat(MIN_PERCENT_RESPONDED,
-        MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
-        conf.getFloat(MIN_PERCENT_RESPONDED,
-            MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
-      throw new IllegalArgumentException(
-          "Invalid " +
-              conf.getFloat(MIN_PERCENT_RESPONDED,
-                  MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
-                  MIN_PERCENT_RESPONDED);
-    }
-    if (conf.getInt(MIN_WORKERS, -1) < 0) {
-      throw new IllegalArgumentException("No valid " + MIN_WORKERS);
-    }
-    if (BspUtils.getVertexClass(getConfiguration()) == null) {
-      throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
-    }
-    if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
-      throw new IllegalArgumentException(
-          "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
-    }
-    if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
-      setVertexResolverClass(VertexResolver.class);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("GiraphJob: No class found for " +
-            VERTEX_RESOLVER_CLASS + ", defaulting to " +
-            VertexResolver.class.getCanonicalName());
-      }
-    }
+  public GiraphConfiguration(Configuration conf) {
+    super(conf);
   }
 
   /**
@@ -576,8 +564,9 @@ public class GiraphJob {
    *
    * @param vertexClass Runs vertex computation
    */
-  public final void setVertexClass(Class<?> vertexClass) {
-    getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+  public final void setVertexClass(
+      Class<? extends Vertex> vertexClass) {
+    setClass(VERTEX_CLASS, vertexClass, Vertex.class);
   }
 
   /**
@@ -586,8 +575,8 @@ public class GiraphJob {
    * @param vertexInputFormatClass Determines how graph is input
    */
   public final void setVertexInputFormatClass(
-      Class<?> vertexInputFormatClass) {
-    getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
+      Class<? extends VertexInputFormat> vertexInputFormatClass) {
+    setClass(VERTEX_INPUT_FORMAT_CLASS,
         vertexInputFormatClass,
         VertexInputFormat.class);
   }
@@ -597,8 +586,9 @@ public class GiraphJob {
    *
    * @param masterComputeClass Runs master computation
    */
-  public final void setMasterComputeClass(Class<?> masterComputeClass) {
-    getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
+  public final void setMasterComputeClass(
+      Class<? extends MasterCompute> masterComputeClass) {
+    setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
         MasterCompute.class);
   }
 
@@ -608,8 +598,8 @@ public class GiraphJob {
    * @param vertexOutputFormatClass Determines how graph is output
    */
   public final void setVertexOutputFormatClass(
-      Class<?> vertexOutputFormatClass) {
-    getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+    setClass(VERTEX_OUTPUT_FORMAT_CLASS,
         vertexOutputFormatClass,
         VertexOutputFormat.class);
   }
@@ -619,8 +609,9 @@ public class GiraphJob {
    *
    * @param vertexCombinerClass Determines how vertex messages are combined
    */
-  public final void setVertexCombinerClass(Class<?> vertexCombinerClass) {
-    getConfiguration().setClass(VERTEX_COMBINER_CLASS,
+  public final void setVertexCombinerClass(
+      Class<? extends VertexCombiner> vertexCombinerClass) {
+    setClass(VERTEX_COMBINER_CLASS,
         vertexCombinerClass,
         VertexCombiner.class);
   }
@@ -632,7 +623,7 @@ public class GiraphJob {
    */
   public final void setGraphPartitionerFactoryClass(
       Class<?> graphPartitionerFactoryClass) {
-    getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+    setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
         graphPartitionerFactoryClass,
         GraphPartitionerFactory.class);
   }
@@ -642,8 +633,9 @@ public class GiraphJob {
    *
    * @param vertexResolverClass Determines how vertex mutations are resolved
    */
-  public final void setVertexResolverClass(Class<?> vertexResolverClass) {
-    getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
+  public final void setVertexResolverClass(
+      Class<? extends VertexResolver> vertexResolverClass) {
+    setClass(VERTEX_RESOLVER_CLASS,
         vertexResolverClass,
         VertexResolver.class);
   }
@@ -654,8 +646,9 @@ public class GiraphJob {
    * @param workerContextClass Determines what code is executed on a each
    *        worker before and after each superstep and computation
    */
-  public final void setWorkerContextClass(Class<?> workerContextClass) {
-    getConfiguration().setClass(WORKER_CONTEXT_CLASS,
+  public final void setWorkerContextClass(
+      Class<? extends WorkerContext> workerContextClass) {
+    setClass(WORKER_CONTEXT_CLASS,
         workerContextClass,
         WorkerContext.class);
   }
@@ -667,8 +660,8 @@ public class GiraphJob {
    *        written to file at the end of the job
    */
   public final void setAggregatorWriterClass(
-      Class<?> aggregatorWriterClass) {
-    getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
+      Class<? extends AggregatorWriter> aggregatorWriterClass) {
+    setClass(AGGREGATOR_WRITER_CLASS,
         aggregatorWriterClass,
         AggregatorWriter.class);
   }
@@ -684,11 +677,23 @@ public class GiraphJob {
    *        have responded before continuing the superstep
    */
   public final void setWorkerConfiguration(int minWorkers,
-      int maxWorkers,
-      float minPercentResponded) {
-    conf.setInt(MIN_WORKERS, minWorkers);
-    conf.setInt(MAX_WORKERS, maxWorkers);
-    conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+                                           int maxWorkers,
+                                           float minPercentResponded) {
+    setInt(MIN_WORKERS, minWorkers);
+    setInt(MAX_WORKERS, maxWorkers);
+    setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+  }
+
+  public final int getMinWorkers() {
+    return getInt(MIN_WORKERS, -1);
+  }
+
+  public final int getMaxWorkers() {
+    return getInt(MAX_WORKERS, -1);
+  }
+
+  public final float getMinPercentResponded() {
+    return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT);
   }
 
   /**
@@ -699,90 +704,87 @@ public class GiraphJob {
    *        (i.e. zk1:2221,zk2:2221)
    */
   public final void setZooKeeperConfiguration(String serverList) {
-    conf.set(ZOOKEEPER_LIST, serverList);
+    set(ZOOKEEPER_LIST, serverList);
+  }
+
+  public final boolean getSplitMasterWorker() {
+    return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT);
   }
 
   /**
-   * Check if the configuration is local.  If it is local, do additional
-   * checks due to the restrictions of LocalJobRunner.
+   * Get the task partition
    *
-   * @param conf Configuration
+   * @return The task partition or -1 if not set
    */
-  private static void checkLocalJobRunnerConfiguration(
-      Configuration conf) {
-    String jobTracker = conf.get("mapred.job.tracker", null);
-    if (!jobTracker.equals("local")) {
-      // Nothing to check
-      return;
-    }
-
-    int maxWorkers = conf.getInt(MAX_WORKERS, -1);
-    if (maxWorkers != 1) {
-      throw new IllegalArgumentException(
-          "checkLocalJobRunnerConfiguration: When using " +
-              "LocalJobRunner, must have only one worker since " +
-          "only 1 task at a time!");
-    }
-    if (conf.getBoolean(SPLIT_MASTER_WORKER,
-        SPLIT_MASTER_WORKER_DEFAULT)) {
-      throw new IllegalArgumentException(
-          "checkLocalJobRunnerConfiguration: When using " +
-              "LocalJobRunner, you cannot run in split master / worker " +
-          "mode since there is only 1 task at a time!");
-    }
+  public int getTaskPartition() {
+    return getInt("mapred.task.partition", -1);
   }
 
   /**
-   * Check whether a specified int conf value is set and if not, set it.
+   * Get the ZooKeeper list.
    *
-   * @param param Conf value to check
-   * @param defaultValue Assign to value if not set
+   * @return ZooKeeper list of strings, comma separated or null if none set.
    */
-  private void setIntConfIfDefault(String param, int defaultValue) {
-    if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
-      conf.setInt(param, defaultValue);
-    }
+  public String getZookeeperList() {
+    return get(ZOOKEEPER_LIST);
+  }
+
+  public String getLocalLevel() {
+    return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
+  }
+
+  public boolean getLocalTestMode() {
+    return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
+  }
+
+  public boolean getUseNetty() {
+    return getBoolean(USE_NETTY, USE_NETTY_DEFAULT);
+  }
+
+  public int getZooKeeperServerCount() {
+    return getInt(ZOOKEEPER_SERVER_COUNT,
+        ZOOKEEPER_SERVER_COUNT_DEFAULT);
   }
 
   /**
-   * Runs the actual graph application through Hadoop Map-Reduce.
+   * Set the ZooKeeper jar classpath
    *
-   * @param verbose If true, provide verbose output, false otherwise
-   * @return True if success, false otherwise
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public final boolean run(boolean verbose)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    checkConfiguration();
-    checkLocalJobRunnerConfiguration(conf);
-    job.setNumReduceTasks(0);
-    // Most users won't hit this hopefully and can set it higher if desired
-    setIntConfIfDefault("mapreduce.job.counters.limit", 512);
-
-    // Capacity scheduler-specific settings.  These should be enough for
-    // a reasonable Giraph job
-    setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
-    setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
-
-    // Speculative execution doesn't make sense for Giraph
-    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-
-    // Set the ping interval to 5 minutes instead of one minute
-    // (DEFAULT_PING_INTERVAL)
-    Client.setPingInterval(conf, 60000 * 5);
+   * @param classPath Classpath for the ZooKeeper jar
+   */
+  public void setZooKeeperJar(String classPath) {
+    set(ZOOKEEPER_JAR, classPath);
+  }
+
+  public int getZooKeeperSessionTimeout() {
+    return getInt(ZOOKEEPER_SESSION_TIMEOUT,
+        ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+  }
 
-    if (job.getJar() == null) {
-      job.setJarByClass(GiraphJob.class);
+  public boolean getNettyServerUseExecutionHandler() {
+    return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
+        NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+  }
+
+  public int getNettyServerThreads() {
+    return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT);
+  }
+
+  public int getNettyServerExecutionThreads() {
+    return getInt(NETTY_SERVER_EXECUTION_THREADS,
+        NETTY_SERVER_EXECUTION_THREADS_DEFAULT);
+  }
+
+  /**
+   * Get the netty server execution concurrency.  This depends on whether the
+   * netty server execution handler exists.
+   *
+   * @return Server concurrency
+   */
+  public int getNettyServerExecutionConcurrency() {
+    if (getNettyServerUseExecutionHandler()) {
+      return getNettyServerExecutionThreads();
+    } else {
+      return getNettyServerThreads();
     }
-    // Should work in MAPREDUCE-1938 to let the user jars/classes
-    // get loaded first
-    conf.setBoolean("mapreduce.user.classpath.first", true);
-
-    job.setMapperClass(GraphMapper.class);
-    job.setInputFormatClass(BspInputFormat.class);
-    job.setOutputFormatClass(BspOutputFormat.class);
-    return job.waitForCompletion(verbose);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java Tue Sep 25 17:40:18 2012
@@ -23,9 +23,15 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.giraph.examples.Algorithm;
+import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.utils.AnnotationUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -159,14 +165,21 @@ public class GiraphRunner implements Too
     }
 
     int workers = Integer.parseInt(cmd.getOptionValue('w'));
-    GiraphJob job = new GiraphJob(getConf(), "Giraph: " + vertexClassName);
-    job.setVertexClass(Class.forName(vertexClassName));
-    job.setVertexInputFormatClass(Class.forName(cmd.getOptionValue("if")));
-    job.setVertexOutputFormatClass(Class.forName(cmd.getOptionValue("of")));
+    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    giraphConfiguration.setVertexClass(
+        (Class<? extends Vertex>) Class.forName(vertexClassName));
+    giraphConfiguration.setVertexInputFormatClass(
+        (Class<? extends VertexInputFormat>)
+            Class.forName(cmd.getOptionValue("if")));
+    giraphConfiguration.setVertexOutputFormatClass(
+        (Class<? extends VertexOutputFormat>)
+            Class.forName(cmd.getOptionValue("of")));
+    GiraphJob job = new GiraphJob(
+        giraphConfiguration, "Giraph: " + vertexClassName);
 
     if (cmd.hasOption("ip")) {
       FileInputFormat.addInputPath(job.getInternalJob(),
-                                   new Path(cmd.getOptionValue("ip")));
+          new Path(cmd.getOptionValue("ip")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No input path specified. Ensure your InputFormat does" +
@@ -185,19 +198,27 @@ public class GiraphRunner implements Too
     }
 
     if (cmd.hasOption("c")) {
-      job.setVertexCombinerClass(Class.forName(cmd.getOptionValue("c")));
+      giraphConfiguration.setVertexCombinerClass(
+          (Class<? extends VertexCombiner>)
+              Class.forName(cmd.getOptionValue("c")));
     }
 
     if (cmd.hasOption("wc")) {
-      job.setWorkerContextClass(Class.forName(cmd.getOptionValue("wc")));
+      giraphConfiguration.setWorkerContextClass(
+          (Class<? extends WorkerContext>)
+              Class.forName(cmd.getOptionValue("wc")));
     }
 
     if (cmd.hasOption("mc")) {
-      job.setMasterComputeClass(Class.forName(cmd.getOptionValue("mc")));
+      giraphConfiguration.setMasterComputeClass(
+          (Class<? extends MasterCompute>)
+              Class.forName(cmd.getOptionValue("mc")));
     }
 
     if (cmd.hasOption("aw")) {
-      job.setAggregatorWriterClass(Class.forName(cmd.getOptionValue("aw")));
+      giraphConfiguration.setAggregatorWriterClass(
+          (Class<? extends AggregatorWriter>)
+              Class.forName(cmd.getOptionValue("aw")));
     }
 
     if (cmd.hasOption("cf")) {
@@ -230,10 +251,9 @@ public class GiraphRunner implements Too
       new GiraphTypeValidator(job.getConfiguration());
     validator.validateClassTypes();
 
-    job.setWorkerConfiguration(workers, workers, 100.0f);
+    giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
 
     boolean verbose = !cmd.hasOption('q');
-
     return job.run(verbose) ? 0 : -1;
   }
 

Copied: giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java?p2=giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java&p1=giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java Tue Sep 25 17:40:18 2012
@@ -16,35 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.giraph.graph.partition;
+package org.apache.giraph;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
- * Defines the partitioning framework for this application.
+ * Can be instantiated with ImmutableClassesGiraphConfiguration
  *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+public interface ImmutableClassesGiraphConfigurable<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> {
   /**
-   * Create the {@link MasterGraphPartitioner} used by the master.
-   * Instantiated once by the master and reused.
+   * Set the configuration to be used by this object.
    *
-   * @return Instantiated master graph partitioner
+   * @param configuration Set configuration
    */
-  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+  void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M>
+                   configuration);
 
   /**
-   * Create the {@link WorkerGraphPartitioner} used by the worker.
-   * Instantiated once by every worker and reused.
+   * Return the configuration used by this object.
    *
-   * @return Instantiated worker graph partitioner
+   * @return Set configuration
    */
-  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+  ImmutableClassesGiraphConfiguration<I, V, E, M> getConf();
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1390014&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Tue Sep 25 17:40:18 2012
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.util.List;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.DefaultWorkerContext;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.TextAggregatorWriter;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The classes set here are immutable, the remaining configuration is mutable.
+ * Classes are immutable and final to provide the best performance for
+ * instantiation.  Everything is thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    GiraphConfiguration {
+  /** Vertex class - cached for fast access */
+  private final Class<? extends Vertex<I, V, E, M>> vertexClass;
+  /** Vertex id class - cached for fast access */
+  private final Class<I> vertexIdClass;
+  /** Vertex value class - cached for fast access */
+  private final Class<V> vertexValueClass;
+  /** Edge value class - cached for fast access */
+  private final Class<E> edgeValueClass;
+  /** Message value class - cached for fast access */
+  private final Class<M> messageValueClass;
+
+  /** Graph partitioner factory class - cached for fast access */
+  private final Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  graphPartitionerFactoryClass;
+  /** Master graph partitioner - cached for fast access */
+  private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+
+  /** Vertex input format class - cached for fast access */
+  private final Class<? extends VertexInputFormat<I, V, E, M>>
+  vertexInputFormatClass;
+  /** Vertex output format class - cached for fast access */
+  private final Class<? extends VertexOutputFormat<I, V, E>>
+  vertexOutputFormatClass;
+
+  /** Aggregator writer class - cached for fast access */
+  private final Class<? extends AggregatorWriter> aggregatorWriterClass;
+  /** Vertex combiner class - cached for fast access */
+  private final Class<? extends VertexCombiner<I, M>> vertexCombinerClass;
+  /** Vertex resolver class - cached for fast access */
+  private final Class<? extends VertexResolver<I, V, E, M>>
+  vertexResolverClass;
+  /** Worker context class - cached for fast access */
+  private final Class<? extends WorkerContext> workerContextClass;
+  /** Master compute class - cached for fast access */
+  private final Class<? extends MasterCompute> masterComputeClass;
+
+  /**
+   * Constructor.  Takes the configuration and then gets the classes out of
+   * them for Giraph
+   *
+   * @param conf Configuration
+   */
+  public ImmutableClassesGiraphConfiguration(Configuration conf) {
+    super(conf);
+    // set pre-validated generic parameter types into Configuration
+    vertexClass = (Class<? extends Vertex<I, V, E, M>>)
+        conf.getClass(VERTEX_CLASS, null, Vertex.class);
+    List<Class<?>> classList =
+        org.apache.giraph.utils.ReflectionUtils.<Vertex>getTypeArguments(
+            Vertex.class, vertexClass);
+    vertexIdClass = (Class<I>) classList.get(0);
+    vertexValueClass = (Class<V>) classList.get(1);
+    edgeValueClass = (Class<E>) classList.get(2);
+    messageValueClass = (Class<M>) classList.get(3);
+
+    graphPartitionerFactoryClass =
+        (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+        conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+            HashPartitionerFactory.class,
+            GraphPartitionerFactory.class);
+    masterGraphPartitioner =
+        (MasterGraphPartitioner<I, V, E, M>)
+            createGraphPartitioner().createMasterGraphPartitioner();
+
+    vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E, M>>)
+        conf.getClass(VERTEX_INPUT_FORMAT_CLASS,
+        null, VertexInputFormat.class);
+    vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
+        conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
+        null, VertexOutputFormat.class);
+
+    aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
+        TextAggregatorWriter.class, AggregatorWriter.class);
+    vertexCombinerClass = (Class<? extends VertexCombiner<I, M>>)
+        conf.getClass(VERTEX_COMBINER_CLASS,
+        null, VertexCombiner.class);
+    vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+        conf.getClass(VERTEX_RESOLVER_CLASS,
+        VertexResolver.class, VertexResolver.class);
+    workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
+        DefaultWorkerContext.class, WorkerContext.class);
+    masterComputeClass =  conf.getClass(MASTER_COMPUTE_CLASS,
+        DefaultMasterCompute.class, MasterCompute.class);
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.partition.GraphPartitionerFactory}.
+   *
+   * @return User's graph partitioner
+   */
+  public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  getGraphPartitionerClass() {
+    return graphPartitionerFactoryClass;
+  }
+
+  /**
+   * Create a user graph partitioner class
+   *
+   * @return Instantiated user graph partitioner class
+   */
+  public GraphPartitionerFactory<I, V, E, M> createGraphPartitioner() {
+    return ReflectionUtils.newInstance(graphPartitionerFactoryClass, this);
+  }
+
+  /**
+   * Create a user graph partitioner partition stats class
+   *
+   * @return Instantiated user graph partition stats class
+   */
+  public PartitionStats createGraphPartitionStats() {
+    return masterGraphPartitioner.createPartitionStats();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexInputFormat}.
+   *
+   * @return User's vertex input format class
+   */
+  public Class<? extends VertexInputFormat<I, V, E, M>>
+  getVertexInputFormatClass() {
+    return vertexInputFormatClass;
+  }
+
+  /**
+   * Create a user vertex input format class
+   *
+   * @return Instantiated user vertex input format class
+   */
+  public VertexInputFormat<I, V, E, M>
+  createVertexInputFormat() {
+    return ReflectionUtils.newInstance(vertexInputFormatClass, this);
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexOutputFormat}.
+   *
+   * @return User's vertex output format class
+   */
+  public Class<? extends VertexOutputFormat<I, V, E>>
+  getVertexOutputFormatClass() {
+    return vertexOutputFormatClass;
+  }
+
+  /**
+   * Create a user vertex output format class
+   *
+   * @return Instantiated user vertex output format class
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexOutputFormat<I, V, E> createVertexOutputFormat() {
+    return ReflectionUtils.newInstance(vertexOutputFormatClass, this);
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}.
+   *
+   * @return User's aggregator writer class
+   */
+  public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
+    return aggregatorWriterClass;
+  }
+
+  /**
+   * Create a user aggregator output format class
+   *
+   * @return Instantiated user aggregator writer class
+   */
+  public AggregatorWriter createAggregatorWriter() {
+    return ReflectionUtils.newInstance(aggregatorWriterClass, this);
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.VertexCombiner}.
+   *
+   * @return User's vertex combiner class
+   */
+  public Class<? extends VertexCombiner<I, M>> getVertexCombinerClass() {
+    return vertexCombinerClass;
+  }
+
+  /**
+   * Create a user vertex combiner class
+   *
+   * @return Instantiated user vertex combiner class
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexCombiner<I, M> createVertexCombiner() {
+    return ReflectionUtils.newInstance(vertexCombinerClass, this);
+  }
+
+  /**
+   * Get the user's subclassed VertexResolver.
+   *
+   * @return User's vertex resolver class
+   */
+  public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+    return vertexResolverClass;
+  }
+
+  /**
+   * Create a user vertex revolver
+   *
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user vertex resolver
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexResolver<I, V, E, M> createVertexResolver(
+                       GraphState<I, V, E, M> graphState) {
+    VertexResolver<I, V, E, M> resolver =
+        ReflectionUtils.newInstance(vertexResolverClass, this);
+    resolver.setGraphState(graphState);
+    return resolver;
+  }
+
+  /**
+   * Get the user's subclassed WorkerContext.
+   *
+   * @return User's worker context class
+   */
+  public Class<? extends WorkerContext> getWorkerContextClass() {
+    return workerContextClass;
+  }
+
+  /**
+   * Create a user worker context
+   *
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user worker context
+   */
+  @SuppressWarnings("rawtypes")
+  public WorkerContext createWorkerContext(GraphState<I, V, E, M> graphState) {
+    WorkerContext workerContext =
+        ReflectionUtils.newInstance(workerContextClass, this);
+    workerContext.setGraphState(graphState);
+    return workerContext;
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.MasterCompute}
+   *
+   * @return User's master class
+   */
+  public Class<? extends MasterCompute> getMasterComputeClass() {
+    return masterComputeClass;
+  }
+
+  /**
+   * Create a user master
+   *
+   * @return Instantiated user master
+   */
+  public MasterCompute createMasterCompute() {
+    return ReflectionUtils.newInstance(masterComputeClass, this);
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+   *
+   * @return User's vertex class
+   */
+  public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
+    return vertexClass;
+  }
+
+  /**
+   * Create a user vertex
+   *
+   * @return Instantiated user vertex
+   */
+  public Vertex<I, V, E, M> createVertex() {
+    return ReflectionUtils.newInstance(vertexClass, this);
+  }
+
+  /**
+   * Get the user's subclassed vertex index class.
+   *
+   * @return User's vertex index class
+   */
+  public Class<I> getVertexIdClass() {
+    return vertexIdClass;
+  }
+
+  /**
+   * Create a user vertex index
+   *
+   * @return Instantiated user vertex index
+   */
+  public I createVertexId() {
+    try {
+      return vertexIdClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+          "createVertexId: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+          "createVertexId: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex value class.
+   *
+   * @return User's vertex value class
+   */
+  public Class<V> getVertexValueClass() {
+    return vertexValueClass;
+  }
+
+  /**
+   * Create a user vertex value
+   *
+   * @return Instantiated user vertex value
+   */
+  @SuppressWarnings("unchecked")
+  public V createVertexValue() {
+    if (vertexValueClass == NullWritable.class) {
+      return (V) NullWritable.get();
+    } else {
+      try {
+        return vertexValueClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createVertexValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createVertexValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Get the user's subclassed edge value class.
+   *
+   * @return User's vertex edge value class
+   */
+  public Class<E> getEdgeValueClass() {
+    return edgeValueClass;
+  }
+
+  /**
+   * Create a user edge value
+   *
+   * @return Instantiated user edge value
+   */
+  public E createEdgeValue() {
+    if (edgeValueClass == NullWritable.class) {
+      return (E) NullWritable.get();
+    } else {
+      try {
+        return edgeValueClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createEdgeValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createEdgeValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex message value class.
+   *
+   * @return User's vertex message value class
+   */
+  @SuppressWarnings("unchecked")
+  public Class<M> getMessageValueClass() {
+    return messageValueClass;
+  }
+
+  /**
+   * Create a user vertex message value
+   *
+   * @return Instantiated user vertex message value
+   */
+  public M createMessageValue() {
+    if (messageValueClass == NullWritable.class) {
+      return (M) NullWritable.get();
+    } else {
+      try {
+        return messageValueClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createMessageValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createMessageValue: Illegally accessed", e);
+      }
+    }
+  }
+}

Copied: giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java?p2=giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java&p1=giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java Tue Sep 25 17:40:18 2012
@@ -16,39 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.giraph.comm.netty;
+package org.apache.giraph.benchmark;
 
-import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
-import org.apache.giraph.comm.MasterServer;
-import org.apache.hadoop.conf.Configuration;
-
-import java.net.InetSocketAddress;
+import java.io.IOException;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 
 /**
- * Netty implementation of {@link MasterServer}
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.EdgeListVertex} implementation rather than
+ * {@link org.apache.giraph.graph.HashMapVertex}
  */
-public class NettyMasterServer implements MasterServer {
-  /** Netty client that does the actual I/O */
-  private final NettyServer nettyServer;
-
-  /**
-   * Constructor
-   *
-   * @param conf Hadoop configuration
-   */
-  public NettyMasterServer(Configuration conf) {
-    nettyServer = new NettyServer(conf,
-        new MasterRequestServerHandler.Factory());
-    nettyServer.start();
-  }
-
-  @Override
-  public InetSocketAddress getMyAddress() {
-    return nettyServer.getMyAddress();
-  }
-
+public class EdgeListVertexPageRankBenchmark extends EdgeListVertex<
+    LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
   @Override
-  public void close() {
-    nettyServer.stop();
+  public void compute(Iterable<DoubleWritable> messages) throws
+      IOException {
+    PageRankComputation.computePageRank(this, messages);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,37 +23,25 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.examples.DoubleSumCombiner;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-
 /**
- * Default Pregel-style PageRank computation using a {@link EdgeListVertex}.
+ * Default Pregel-style PageRank computation.
  */
-public class PageRankBenchmark extends EdgeListVertex<
-    LongWritable, DoubleWritable, DoubleWritable, DoubleWritable>
-    implements Tool {
+public class PageRankBenchmark implements Tool {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class);
   /** Configuration from Configurable */
   private Configuration conf;
 
   @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    PageRankComputation.computePageRank(this, messages);
-  }
-
-  @Override
   public Configuration getConf() {
     return conf;
   }
@@ -88,10 +76,15 @@ public class PageRankBenchmark extends E
         "vertexClass",
         true,
         "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+    options.addOption("N",
+        "name",
+        true,
+        "Name of the job");
     options.addOption("nc",
         "noCombiner",
         false,
         "Don't use a combiner");
+
     HelpFormatter formatter = new HelpFormatter();
     if (args.length == 0) {
       formatter.printHelp(getClass().getName(), options, true);
@@ -122,20 +115,28 @@ public class PageRankBenchmark extends E
     }
 
     int workers = Integer.parseInt(cmd.getOptionValue('w'));
-    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    String name = getClass().getName();
+    if (cmd.hasOption("N")) {
+      name = name + " " + cmd.getOptionValue("N");
+    }
+
+    GiraphJob job = new GiraphJob(getConf(), name);
     if (!cmd.hasOption('c') ||
         (Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
-      job.setVertexClass(PageRankBenchmark.class);
+      job.getConfiguration().setVertexClass(
+          EdgeListVertexPageRankBenchmark.class);
     } else {
-      job.setVertexClass(HashMapVertexPageRankBenchmark.class);
+      job.getConfiguration().setVertexClass(
+          HashMapVertexPageRankBenchmark.class);
     }
     LOG.info("Using class " +
-        BspUtils.getVertexClass(job.getConfiguration()).getName());
+        job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
     if (!cmd.hasOption("nc")) {
-      job.setVertexCombinerClass(DoubleSumCombiner.class);
+      job.getConfiguration().setVertexCombinerClass(DoubleSumCombiner.class);
     }
-    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    job.setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setVertexInputFormatClass(
+        PseudoRandomVertexInputFormat.class);
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,6 +23,7 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.EdgeListVertex;
@@ -352,12 +353,15 @@ public class RandomMessageBenchmark impl
     }
     int workers = Integer.parseInt(cmd.getOptionValue('w'));
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-    job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
-    job.setVertexClass(RandomMessageVertex.class);
-    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
-    job.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
-    job.setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setInt(GiraphConfiguration.CHECKPOINT_FREQUENCY, 0);
+    job.getConfiguration().setVertexClass(RandomMessageVertex.class);
+    job.getConfiguration().setVertexInputFormatClass(
+        PseudoRandomVertexInputFormat.class);
+    job.getConfiguration().setWorkerContextClass(
+        RandomMessageBenchmarkWorkerContext.class);
+    job.getConfiguration().setMasterComputeClass(
+        RandomMessageBenchmarkMasterCompute.class);
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));
@@ -383,7 +387,8 @@ public class RandomMessageBenchmark impl
           Integer.parseInt(cmd.getOptionValue('s')));
     }
     if (cmd.hasOption('f')) {
-      job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+      job.getConfiguration().setInt(
+          GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
           Integer.parseInt(cmd.getOptionValue('f')));
     }
     if (job.run(isVerbose)) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,8 +23,8 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.examples.MinimumDoubleCombiner;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.io.PseudoRandomVertexInputFormat;
@@ -40,19 +40,26 @@ import java.io.IOException;
 /**
  * Single-source shortest paths benchmark.
  */
-public class ShortestPathsBenchmark extends EdgeListVertex<LongWritable,
-    DoubleWritable, DoubleWritable, DoubleWritable> implements Tool {
+public class ShortestPathsBenchmark implements Tool {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ShortestPathsBenchmark.class);
   /** Configuration */
   private Configuration conf;
 
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    ShortestPathsComputation.computeShortestPaths(this, messages);
+  /**
+   * Vertex implementation
+   */
+  public static class ShortestPathsBenchmarkVertex extends
+      EdgeListVertex<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+      ShortestPathsComputation.computeShortestPaths(this, messages);
+    }
   }
 
+
   @Override
   public Configuration getConf() {
     return conf;
@@ -117,17 +124,20 @@ public class ShortestPathsBenchmark exte
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
     if (!cmd.hasOption('c') ||
         (Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
-      job.setVertexClass(ShortestPathsBenchmark.class);
+      job.getConfiguration().setVertexClass(ShortestPathsBenchmarkVertex.class);
     } else {
-      job.setVertexClass(HashMapVertexShortestPathsBenchmark.class);
+      job.getConfiguration().setVertexClass(
+          HashMapVertexShortestPathsBenchmark.class);
     }
     LOG.info("Using class " +
-        BspUtils.getVertexClass(job.getConfiguration()).getName());
-    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+        job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
+    job.getConfiguration().setVertexInputFormatClass(
+        PseudoRandomVertexInputFormat.class);
     if (!cmd.hasOption("nc")) {
-      job.setVertexCombinerClass(MinimumDoubleCombiner.class);
+      job.getConfiguration().setVertexCombinerClass(
+          MinimumDoubleCombiner.class);
     }
-    job.setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java Tue Sep 25 17:40:18 2012
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,15 +49,15 @@ public class BspInputFormat extends Inpu
    * @return Maximum number of tasks
    */
   public static int getMaxTasks(Configuration conf) {
-    int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
+    int maxWorkers = conf.getInt(GiraphConfiguration.MAX_WORKERS, 0);
     boolean splitMasterWorker =
-        conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
-            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+        conf.getBoolean(GiraphConfiguration.SPLIT_MASTER_WORKER,
+            GiraphConfiguration.SPLIT_MASTER_WORKER_DEFAULT);
     int maxTasks = maxWorkers;
     if (splitMasterWorker) {
       int zkServers =
-          conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-              GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+          conf.getInt(GiraphConfiguration.ZOOKEEPER_SERVER_COUNT,
+              GiraphConfiguration.ZOOKEEPER_SERVER_COUNT_DEFAULT);
       maxTasks += zkServers;
     }
     if (LOG.isDebugEnabled()) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,11 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexMutations;
@@ -30,7 +31,6 @@ import org.apache.giraph.graph.WorkerInf
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
@@ -86,7 +86,7 @@ public abstract class BasicRPCCommunicat
   /** Maximum number of vertices sent in a single RPC */
   private static final int MAX_VERTICES_PER_RPC = 1024;
   /** Hadoop configuration */
-  protected final Configuration conf;
+  protected final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Saved context for progress */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Indicates whether in superstep preparation */
@@ -438,22 +438,23 @@ public abstract class BasicRPCCommunicat
    * Only constructor.
    *
    * @param context Context for getting configuration
+   * @param configuration Configuration
    * @param service Service worker to get the vertex ranges
    * @throws IOException
-   * @throws UnknownHostException
    * @throws InterruptedException
    */
   public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+    ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
     CentralizedServiceWorker<I, V, E, M> service)
     throws IOException, InterruptedException {
     this.service = service;
     this.context = context;
-    this.conf = context.getConfiguration();
-    this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
-        GiraphJob.MSG_SIZE_DEFAULT);
+    this.conf = configuration;
+    this.maxSize = conf.getInt(GiraphConfiguration.MSG_SIZE,
+        GiraphConfiguration.MSG_SIZE_DEFAULT);
     this.maxMessagesPerFlushPut =
-        conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
-            GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
+        conf.getInt(GiraphConfiguration.MAX_MESSAGES_PER_FLUSH_PUT,
+            GiraphConfiguration.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
     if (BspUtils.getVertexCombinerClass(conf) == null) {
       this.combiner = null;
     } else {
@@ -466,19 +467,19 @@ public abstract class BasicRPCCommunicat
 
 
 
-    int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
-        GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
+    int numHandlers = conf.getInt(GiraphConfiguration.RPC_NUM_HANDLERS,
+        GiraphConfiguration.RPC_NUM_HANDLERS_DEFAULT);
     if (numTasks < numHandlers) {
       numHandlers = numTasks;
     }
     this.jobToken = createJobToken();
     this.jobId = context.getJobID().toString();
 
-    int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
+    int numWorkers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks);
     // If the number of flush threads is unset, it is set to
     // the number of max workers - 1 or a minimum of 1.
     int numFlushThreads =
-        Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+        Math.max(conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
             numWorkers - 1),
             1);
     this.executor = Executors.newFixedThreadPool(numFlushThreads);
@@ -490,16 +491,16 @@ public abstract class BasicRPCCommunicat
     int portIncrementConstant =
         (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
     String bindAddress = localHostname;
-    int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
-        GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+    int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
+        GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
         taskId;
     int bindAttempts = 0;
     final int maxRpcPortBindAttempts =
-        conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
-            GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+        conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
+            GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
     final boolean failFirstPortBindingAttempt =
-        conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
-            GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+        conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+            GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
     while (bindAttempts < maxRpcPortBindAttempts) {
       this.myAddress = new InetSocketAddress(bindAddress, bindPort);
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
@@ -1216,8 +1217,7 @@ public abstract class BasicRPCCommunicat
     // Resolve all graph mutations
     for (I vertexIndex : resolveVertexIndexSet) {
       VertexResolver<I, V, E, M> vertexResolver =
-          BspUtils.createVertexResolver(
-              conf, service.getGraphMapper().getGraphState());
+          conf.createVertexResolver(service.getGraphMapper().getGraphState());
       Vertex<I, V, E, M> originalVertex =
           service.getVertex(vertexIndex);
       Iterable<M> messages = inMessages.get(vertexIndex);

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Tue Sep 25 17:40:18 2012
@@ -39,6 +39,8 @@ import org.apache.log4j.Logger;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+
 /*if[HADOOP_NON_SECURE]
 else[HADOOP_NON_SECURE]*/
 import org.apache.giraph.hadoop.BspPolicyProvider;
@@ -78,6 +80,7 @@ public class RPCCommunications<I extends
    * Constructor.
    *
    * @param context Context to be saved.
+   * @param configuration Configuration
    * @param service Server worker.
    * @param graphState Graph state from infrastructure.
    * @throws IOException
@@ -85,9 +88,10 @@ public class RPCCommunications<I extends
    */
   public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
       CentralizedServiceWorker<I, V, E, M> service,
+      ImmutableClassesGiraphConfiguration configuration,
       GraphState<I, V, E, M> graphState) throws
       IOException, InterruptedException {
-    super(context, service);
+    super(context, configuration, service);
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java Tue Sep 25 17:40:18 2012
@@ -24,9 +24,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexCombiner;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -56,11 +55,11 @@ public class SendMessageCache<I extends 
    *
    * @param conf Configuration used for instantiating the combiner.
    */
-  public SendMessageCache(Configuration conf) {
-    if (BspUtils.getVertexCombinerClass(conf) == null) {
+  public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+    if (conf.getVertexCombinerClass() == null) {
       this.combiner = null;
     } else {
-      this.combiner = BspUtils.createVertexCombiner(conf);
+      this.combiner = conf.createVertexCombiner();
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Tue Sep 25 17:40:18 2012
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.partition.DiskBackedPartitionStore;
 import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.giraph.graph.partition.SimplePartitionStore;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -71,15 +71,16 @@ public class ServerData<I extends Writab
    * @param configuration Configuration
    * @param messageStoreFactory Factory for message stores
    */
-  public ServerData(Configuration configuration,
-                    MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
-                        messageStoreFactory) {
+  public ServerData(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+          messageStoreFactory) {
 
     this.messageStoreFactory = messageStoreFactory;
     currentMessageStore = messageStoreFactory.newStore();
     incomingMessageStore = messageStoreFactory.newStore();
-    if (configuration.getBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH,
-        GiraphJob.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+    if (configuration.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_GRAPH,
+        GiraphConfiguration.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
       partitionStore = new DiskBackedPartitionStore<I, V, E, M>(configuration);
     } else {
       partitionStore = new SimplePartitionStore<I, V, E, M>(configuration);

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -55,7 +54,7 @@ public class DiskBackedMessageStore<I ex
   /** In memory message map */
   private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
   /** Hadoop configuration */
-  private final Configuration config;
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
   /** Combiner for messages */
   private final VertexCombiner<I, M> combiner;
   /** Counter for number of messages in memory */
@@ -76,7 +75,7 @@ public class DiskBackedMessageStore<I ex
    * @param fileStoreFactory Factory for creating file stores when flushing
    */
   public DiskBackedMessageStore(VertexCombiner<I, M> combiner,
-      Configuration config,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
     inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
     this.config = config;
@@ -211,7 +210,7 @@ public class DiskBackedMessageStore<I ex
     // read destination vertices
     int numVertices = in.readInt();
     for (int v = 0; v < numVertices; v++) {
-      I vertexId = BspUtils.<I>createVertexId(config);
+      I vertexId = (I) config.createVertexId();
       vertexId.readFields(in);
       destinationVertices.add(vertexId);
     }
@@ -219,13 +218,13 @@ public class DiskBackedMessageStore<I ex
     // read in memory map
     int mapSize = in.readInt();
     for (int m = 0; m < mapSize; m++) {
-      I vertexId = BspUtils.<I>createVertexId(config);
+      I vertexId = config.createVertexId();
       vertexId.readFields(in);
       int numMessages = in.readInt();
       numberOfMessagesInMemory.addAndGet(numMessages);
       List<M> messages = Lists.newArrayList();
       for (int i = 0; i < numMessages; i++) {
-        M message = BspUtils.<M>createMessageValue(config);
+        M message = config.createMessageValue();
         message.readFields(in);
         messages.add(message);
       }
@@ -254,7 +253,7 @@ public class DiskBackedMessageStore<I ex
    */
   public static <I extends WritableComparable, M extends Writable>
   MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
-      Configuration config,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
     return new Factory<I, M>(config, fileStoreFactory);
   }
@@ -269,7 +268,7 @@ public class DiskBackedMessageStore<I ex
       M extends Writable> implements MessageStoreFactory<I, M,
       FlushableMessageStore<I, M>> {
     /** Hadoop configuration */
-    private final Configuration config;
+    private final ImmutableClassesGiraphConfiguration config;
     /** Combiner for messages */
     private final VertexCombiner<I, M> combiner;
     /** Factory for creating message stores for partitions */
@@ -281,13 +280,13 @@ public class DiskBackedMessageStore<I ex
      * @param fileStoreFactory Factory for creating message stores for
      *                         partitions
      */
-    public Factory(Configuration config,
+    public Factory(ImmutableClassesGiraphConfiguration config,
         MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
       this.config = config;
-      if (BspUtils.getVertexCombinerClass(config) == null) {
+      if (config.getVertexCombinerClass() == null) {
         combiner = null;
       } else {
-        combiner = BspUtils.createVertexCombiner(config);
+        combiner = config.createVertexCombiner();
       }
       this.fileStoreFactory = fileStoreFactory;
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,8 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -61,7 +60,7 @@ public class SequentialFileMessageStore<
   /** File in which we store data */
   private final File file;
   /** Configuration which we need for reading data */
-  private final Configuration config;
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
   /** Buffer size to use when reading and writing files */
   private final int bufferSize;
   /** File input stream */
@@ -79,7 +78,9 @@ public class SequentialFileMessageStore<
    * @param fileName   File in which we want to store messages
    * @throws IOException
    */
-  public SequentialFileMessageStore(Configuration config, int bufferSize,
+  public SequentialFileMessageStore(
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+      int bufferSize,
       String fileName) {
     this.config = config;
     this.bufferSize = bufferSize;
@@ -248,7 +249,7 @@ public class SequentialFileMessageStore<
     if (verticesLeft == 0) {
       return null;
     }
-    currentVertexId = BspUtils.<I>createVertexId(config);
+    currentVertexId = config.createVertexId();
     currentVertexId.readFields(in);
     return currentVertexId;
   }
@@ -263,7 +264,7 @@ public class SequentialFileMessageStore<
     int messagesSize = in.readInt();
     ArrayList<M> messages = Lists.newArrayList();
     for (int i = 0; i < messagesSize; i++) {
-      M message = BspUtils.<M>createMessageValue(config);
+      M message = config.createMessageValue();
       message.readFields(in);
       messages.add(message);
     }
@@ -307,7 +308,7 @@ public class SequentialFileMessageStore<
    */
   public static <I extends WritableComparable, M extends Writable>
   MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
-      Configuration config) {
+      ImmutableClassesGiraphConfiguration config) {
     return new Factory<I, M>(config);
   }
 
@@ -321,7 +322,7 @@ public class SequentialFileMessageStore<
       M extends Writable>
       implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
     /** Hadoop configuration */
-    private final Configuration config;
+    private final ImmutableClassesGiraphConfiguration config;
     /** Directory in which we'll keep necessary files */
     private final String directory;
     /** Buffer size to use when reading and writing */
@@ -329,14 +330,19 @@ public class SequentialFileMessageStore<
     /** Counter for created message stores */
     private final AtomicInteger storeCounter;
 
-    /** @param config Hadoop configuration */
-    public Factory(Configuration config) {
+    /**
+     * Constructor.
+     *
+     * @param config Hadoop configuration
+     */
+    public Factory(ImmutableClassesGiraphConfiguration config) {
       this.config = config;
       String jobId = config.get("mapred.job.id", "Unknown Job");
-      this.directory = config.get(GiraphJob.MESSAGES_DIRECTORY,
-          GiraphJob.MESSAGES_DIRECTORY_DEFAULT) + jobId + File.separator;
-      this.bufferSize = config.getInt(GiraphJob.MESSAGES_BUFFER_SIZE,
-          GiraphJob.MESSAGES_BUFFER_SIZE_DEFAULT);
+      this.directory = config.get(GiraphConfiguration.MESSAGES_DIRECTORY,
+          GiraphConfiguration.MESSAGES_DIRECTORY_DEFAULT) + jobId +
+          File.separator;
+      this.bufferSize = config.getInt(GiraphConfiguration.MESSAGES_BUFFER_SIZE,
+          GiraphConfiguration.MESSAGES_BUFFER_SIZE_DEFAULT);
       storeCounter = new AtomicInteger();
       new File(directory).mkdirs();
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,11 +18,10 @@
 
 package org.apache.giraph.comm.messages;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -38,9 +37,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.log4j.Logger;
 
 /**
- * Simple in memory message store implemented with a map
+ * Simple in memory message store implemented with a two level concurrent
+ * hash map.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentMa
 public class SimpleMessageStore<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements MessageStoreByPartition<I, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(SimpleMessageStore.class);
   /** Service worker */
   private final CentralizedServiceWorker<I, V, E, M> service;
   /**
@@ -58,7 +61,7 @@ public class SimpleMessageStore<I extend
    */
   private final ConcurrentMap<Integer, ConcurrentMap<I, Collection<M>>> map;
   /** Hadoop configuration */
-  private final Configuration config;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
   /** Combiner for messages */
   private final VertexCombiner<I, M> combiner;
 
@@ -68,7 +71,8 @@ public class SimpleMessageStore<I extend
    * @param config   Hadoop configuration
    */
   SimpleMessageStore(CentralizedServiceWorker<I, V, E, M> service,
-      VertexCombiner<I, M> combiner, Configuration config) {
+      VertexCombiner<I, M> combiner,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
     this.service = service;
     map = Maps.newConcurrentMap();
     this.combiner = combiner;
@@ -216,12 +220,12 @@ public class SimpleMessageStore<I extend
       ConcurrentMap<I, Collection<M>> partitionMap = Maps.newConcurrentMap();
       int numVertices = in.readInt();
       for (int v = 0; v < numVertices; v++) {
-        I vertexId = BspUtils.<I>createVertexId(config);
+        I vertexId = config.createVertexId();
         vertexId.readFields(in);
         int numMessages = in.readInt();
         List<M> messages = Lists.newArrayList();
         for (int m = 0; m < numMessages; m++) {
-          M message = BspUtils.<M>createMessageValue(config);
+          M message = config.createMessageValue();
           message.readFields(in);
           messages.add(message);
         }
@@ -255,7 +259,8 @@ public class SimpleMessageStore<I extend
   public static <I extends WritableComparable, V extends Writable,
       E extends Writable, M extends Writable>
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E, M> service, Configuration config) {
+      CentralizedServiceWorker<I, V, E, M> service,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
     return new Factory<I, V, E, M>(service, config);
   }
 
@@ -273,7 +278,7 @@ public class SimpleMessageStore<I extend
     /** Service worker */
     private final CentralizedServiceWorker<I, V, E, M> service;
     /** Hadoop configuration */
-    private final Configuration config;
+    private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
     /** Combiner for messages */
     private final VertexCombiner<I, M> combiner;
 
@@ -282,13 +287,13 @@ public class SimpleMessageStore<I extend
      * @param config  Hadoop configuration
      */
     public Factory(CentralizedServiceWorker<I, V, E, M> service,
-        Configuration config) {
+        ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
       this.service = service;
       this.config = config;
-      if (BspUtils.getVertexCombinerClass(config) == null) {
+      if (config.getVertexCombinerClass() == null) {
         combiner = null;
       } else {
-        combiner = BspUtils.createVertexCombiner(config);
+        combiner = config.createVertexCombiner();
       }
     }