You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC

svn commit: r1390014 [2/4] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/...

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Sep 25 17:40:18 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
@@ -38,9 +40,7 @@ import org.apache.giraph.comm.netty.hand
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
 import org.apache.giraph.comm.netty.handler.RequestInfo;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.utils.TimedLogger;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -125,17 +125,20 @@ public class NettyClient {
    * Only constructor
    *
    * @param context Context for progress
+   * @param conf Configuration
    */
-  public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
+  public NettyClient(Mapper<?, ?, ?, ?>.Context context,
+                     final ImmutableClassesGiraphConfiguration conf) {
     this.context = context;
-    final Configuration conf = context.getConfiguration();
     this.channelsPerServer = conf.getInt(
-        GiraphJob.CHANNELS_PER_SERVER,
-        GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
-    sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
-        GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
-    receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
-        GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+        GiraphConfiguration.CHANNELS_PER_SERVER,
+        GiraphConfiguration.DEFAULT_CHANNELS_PER_SERVER);
+    sendBufferSize = conf.getInt(
+        GiraphConfiguration.CLIENT_SEND_BUFFER_SIZE,
+        GiraphConfiguration.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(
+        GiraphConfiguration.CLIENT_RECEIVE_BUFFER_SIZE,
+        GiraphConfiguration.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
 
     limitNumberOfOpenRequests = conf.getBoolean(
         LIMIT_NUMBER_OF_OPEN_REQUESTS,
@@ -153,22 +156,22 @@ public class NettyClient {
     }
 
     maxRequestMilliseconds = conf.getInt(
-        GiraphJob.MAX_REQUEST_MILLISECONDS,
-        GiraphJob.MAX_REQUEST_MILLISECONDS_DEFAULT);
+        GiraphConfiguration.MAX_REQUEST_MILLISECONDS,
+        GiraphConfiguration.MAX_REQUEST_MILLISECONDS_DEFAULT);
 
     maxConnectionFailures = conf.getInt(
-        GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
-        GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+        GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES,
+        GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
 
     maxReconnectionFailures = conf.getInt(
-        GiraphJob.MAX_RECONNECT_ATTEMPTS,
-        GiraphJob.MAX_RECONNECT_ATTEMPTS_DEFAULT);
+        GiraphConfiguration.MAX_RECONNECT_ATTEMPTS,
+        GiraphConfiguration.MAX_RECONNECT_ATTEMPTS_DEFAULT);
 
     waitingRequestMsecs = conf.getInt(
-        GiraphJob.WAITING_REQUEST_MSECS,
-        GiraphJob.WAITING_REQUEST_MSECS_DEFAULT);
+        GiraphConfiguration.WAITING_REQUEST_MSECS,
+        GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
 
-    int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+    int maxThreads = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
         NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
     clientRequestIdRequestInfoMap =
         new MapMaker().concurrencyLevel(maxThreads).makeMap();
@@ -436,6 +439,7 @@ public class NettyClient {
           "have a previous request id = " + request.getRequestId() + ", " +
           "request info of " + oldRequestInfo);
     }
+
     ChannelFuture writeFuture = channel.write(request);
     newRequestInfo.setWriteFuture(writeFuture);
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -43,9 +44,11 @@ public class NettyMasterClient implement
    * Constructor
    *
    * @param context Context from mapper
+   * @param configuration Configuration
    */
-  public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context) {
-    this.nettyClient = new NettyClient(context);
+  public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
+                           ImmutableClassesGiraphConfiguration configuration) {
+    this.nettyClient = new NettyClient(context, configuration);
     workers = Lists.newArrayList();
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterClientServer;
 import org.apache.giraph.comm.MasterServer;
@@ -39,10 +40,13 @@ public class NettyMasterClientServer imp
    * Constructor
    *
    * @param context Mapper context
+   * @param configuration Configuration
    */
-  public NettyMasterClientServer(Mapper<?, ?, ?, ?>.Context context) {
-    client = new NettyMasterClient(context);
-    server = new NettyMasterServer(context.getConfiguration());
+  public NettyMasterClientServer(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration configuration) {
+    client = new NettyMasterClient(context, configuration);
+    server = new NettyMasterServer(configuration);
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.comm.MasterServer;
-import org.apache.hadoop.conf.Configuration;
 
 import java.net.InetSocketAddress;
 
@@ -36,7 +36,7 @@ public class NettyMasterServer implement
    *
    * @param conf Hadoop configuration
    */
-  public NettyMasterServer(Configuration conf) {
+  public NettyMasterServer(ImmutableClassesGiraphConfiguration conf) {
     nettyServer = new NettyServer(conf,
         new MasterRequestServerHandler.Factory());
     nettyServer.start();

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Tue Sep 25 17:40:18 2012
@@ -24,11 +24,11 @@ import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
 import org.apache.giraph.comm.netty.handler.RequestDecoder;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -53,7 +53,7 @@ public class NettyServer {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration conf;
   /** Factory of channels */
   private ChannelFactory channelFactory;
   /** Accepted channels */
@@ -89,15 +89,17 @@ public class NettyServer {
    * @param conf Configuration to use
    * @param requestServerHandlerFactory Factory for request handlers
    */
-  public NettyServer(Configuration conf,
+  public NettyServer(ImmutableClassesGiraphConfiguration conf,
       RequestServerHandler.Factory requestServerHandlerFactory) {
     this.conf = conf;
     this.requestServerHandlerFactory = requestServerHandlerFactory;
 
-    sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE,
-        GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE);
-    receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
-        GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+    sendBufferSize = conf.getInt(
+        GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
+        GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(
+        GiraphConfiguration.SERVER_RECEIVE_BUFFER_SIZE,
+        GiraphConfiguration.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
 
     workerRequestReservedMap = new WorkerRequestReservedMap(conf);
 
@@ -113,11 +115,12 @@ public class NettyServer {
     } catch (UnknownHostException e) {
       throw new IllegalStateException("NettyServer: unable to get hostname");
     }
-    maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+    maximumPoolSize = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
                                   MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
 
-    tcpBacklog = conf.getInt(GiraphJob.TCP_BACKLOG,
-        conf.getInt(GiraphJob.MAX_WORKERS, GiraphJob.TCP_BACKLOG_DEFAULT));
+    tcpBacklog = conf.getInt(GiraphConfiguration.TCP_BACKLOG,
+        conf.getInt(GiraphConfiguration.MAX_WORKERS,
+            GiraphConfiguration.TCP_BACKLOG_DEFAULT));
 
     channelFactory = new NioServerSocketChannelFactory(
         bossExecutorService,
@@ -151,19 +154,19 @@ public class NettyServer {
     int taskId = conf.getInt("mapred.task.partition", -1);
     int numTasks = conf.getInt("mapred.map.tasks", 1);
     // number of workers + 1 for master
-    int numServers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks) + 1;
+    int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
     int portIncrementConstant =
         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
-    int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
-        GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+    int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
+        GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
         taskId;
     int bindAttempts = 0;
     final int maxRpcPortBindAttempts =
-        conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
-            GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+        conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
+            GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
     final boolean failFirstPortBindingAttempt =
-        conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
-            GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+        conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+            GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
 
     // Simple handling of port collisions on the same machine while
     // preserving debugability from the port number alone.

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Tue Sep 25 17:40:18 2012
@@ -20,6 +20,8 @@ package org.apache.giraph.comm.netty;
 
 import com.google.common.collect.Sets;
 import java.util.Set;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.SendMessageCache;
 import org.apache.giraph.comm.SendMutationsCache;
@@ -33,13 +35,11 @@ import org.apache.giraph.comm.requests.S
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -71,7 +71,7 @@ public class NettyWorkerClient<I extends
   private static final Logger LOG =
     Logger.getLogger(NettyWorkerClient.class);
   /** Hadoop configuration */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Netty client that does that actual I/O */
   private final NettyClient nettyClient;
   /** Centralized service, needed to get vertex ranges */
@@ -104,22 +104,27 @@ public class NettyWorkerClient<I extends
    * Only constructor.
    *
    * @param context Context from mapper
+   * @param configuration Configuration
    * @param service Used to get partition mapping
    * @param serverData Server data (used for local requests)
    */
-  public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context,
-                           CentralizedServiceWorker<I, V, E, M> service,
-                           ServerData<I, V, E, M> serverData) {
-    this.nettyClient = new NettyClient(context);
-    this.conf = context.getConfiguration();
+  public NettyWorkerClient(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E, M> service,
+      ServerData<I, V, E, M> serverData) {
+    this.nettyClient = new NettyClient(context, configuration);
+    this.conf = configuration;
     this.service = service;
-    maxMessagesPerPartition = conf.getInt(GiraphJob.MSG_SIZE,
-        GiraphJob.MSG_SIZE_DEFAULT);
-    maxMutationsPerPartition = conf.getInt(GiraphJob.MAX_MUTATIONS_PER_REQUEST,
-        GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+    maxMessagesPerPartition = conf.getInt(
+        GiraphConfiguration.MSG_SIZE,
+        GiraphConfiguration.MSG_SIZE_DEFAULT);
+    maxMutationsPerPartition = conf.getInt(
+        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
+        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
     maxResolveAddressAttempts = conf.getInt(
-        GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS,
-        GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
     sendMessageCache = new SendMessageCache<I, M>(conf);
     sendMutationsCache = new SendMutationsCache<I, V, E, M>();
     this.serverData = serverData;
@@ -157,8 +162,8 @@ public class NettyWorkerClient<I extends
             partitionOwner.getPartitionId()));
       }
     }
-    boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT);
+    boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
+        GiraphConfiguration.USE_NETTY_DEFAULT);
     if (useNetty) {
       addresses.add(service.getMasterInfo().getInetSocketAddress());
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
@@ -54,13 +55,18 @@ public class NettyWorkerClientServer<I e
    * Constructor.
    *
    * @param context Mapper context
+   * @param configuration Configuration
    * @param service Service for partition lookup
    */
-  public NettyWorkerClientServer(Mapper<?, ?, ?, ?>.Context context,
+  public NettyWorkerClientServer(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       CentralizedServiceWorker<I, V, E, M> service) {
-    server = new NettyWorkerServer<I, V, E, M>(context.getConfiguration(),
+    server = new NettyWorkerServer<I, V, E, M>(
+        configuration,
         service);
-    client = new NettyWorkerClient<I, V, E, M>(context, service,
+    client = new NettyWorkerClient<I, V, E, M>(context,
+        configuration, service,
        ((NettyWorkerServer<I, V, E, M>) server).getServerData());
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -30,13 +32,10 @@ import org.apache.giraph.comm.messages.M
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.partition.Partition;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -62,7 +61,7 @@ public class NettyWorkerServer<I extends
   private static final Logger LOG =
     Logger.getLogger(NettyWorkerServer.class);
   /** Hadoop configuration */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Service worker */
   private final CentralizedServiceWorker<I, V, E, M> service;
   /** Netty server that does that actual I/O */
@@ -76,20 +75,21 @@ public class NettyWorkerServer<I extends
    * @param conf Configuration
    * @param service Service to get partition mappings
    */
-  public NettyWorkerServer(Configuration conf,
+  public NettyWorkerServer(ImmutableClassesGiraphConfiguration conf,
       CentralizedServiceWorker<I, V, E, M> service) {
     this.conf = conf;
     this.service = service;
 
     boolean useOutOfCoreMessaging = conf.getBoolean(
-        GiraphJob.USE_OUT_OF_CORE_MESSAGES,
-        GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+        GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
     if (!useOutOfCoreMessaging) {
       serverData = new ServerData<I, V, E, M>(
           conf, SimpleMessageStore.newFactory(service, conf));
     } else {
-      int maxMessagesInMemory = conf.getInt(GiraphJob.MAX_MESSAGES_IN_MEMORY,
-          GiraphJob.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+      int maxMessagesInMemory = conf.getInt(
+          GiraphConfiguration.MAX_MESSAGES_IN_MEMORY,
+          GiraphConfiguration.MAX_MESSAGES_IN_MEMORY_DEFAULT);
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
           SequentialFileMessageStore.newFactory(conf);
       MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
@@ -143,8 +143,8 @@ public class NettyWorkerServer<I extends
     // Resolve all graph mutations
     for (I vertexIndex : resolveVertexIndexSet) {
       VertexResolver<I, V, E, M> vertexResolver =
-          BspUtils.createVertexResolver(
-              conf, service.getGraphMapper().getGraphState());
+          conf.createVertexResolver(
+              service.getGraphMapper().getGraphState());
       Vertex<I, V, E, M> originalVertex =
           service.getVertex(vertexIndex);
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java Tue Sep 25 17:40:18 2012
@@ -18,11 +18,12 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.ByteCounter;
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.giraph.utils.ReflectionUtils;
+
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -38,7 +39,7 @@ public class RequestDecoder extends OneT
   private static final Logger LOG =
       Logger.getLogger(RequestDecoder.class);
   /** Configuration */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration conf;
   /** Byte counter to output */
   private final ByteCounter byteCounter;
 
@@ -48,7 +49,8 @@ public class RequestDecoder extends OneT
    * @param conf Configuration
    * @param byteCounter Keeps track of the decoded bytes
    */
-  public RequestDecoder(Configuration conf, ByteCounter byteCounter) {
+  public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
+                        ByteCounter byteCounter) {
     this.conf = conf;
     this.byteCounter = byteCounter;
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -62,8 +62,8 @@ public abstract class RequestServerHandl
       Configuration conf) {
     this.workerRequestReservedMap = workerRequestReservedMap;
     closeFirstRequest = conf.getBoolean(
-        GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
-        GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
     myWorkerId = conf.getInt("mapred.task.partition", -1);
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -60,8 +60,8 @@ public class ResponseClientHandler exten
       Configuration conf) {
     this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
     dropFirstResponse = conf.getBoolean(
-        GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
-        GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java Tue Sep 25 17:40:18 2012
@@ -21,9 +21,9 @@ package org.apache.giraph.comm.netty.han
 import com.google.common.collect.MapMaker;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.utils.IncreasingBitSet;
 import org.apache.giraph.comm.netty.NettyServer;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -41,7 +41,7 @@ public class WorkerRequestReservedMap {
    */
   public WorkerRequestReservedMap(Configuration conf) {
     workerRequestReservedMap = new MapMaker().concurrencyLevel(
-        conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+        conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
             NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java Tue Sep 25 17:40:18 2012
@@ -19,7 +19,6 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -46,7 +45,7 @@ import java.util.Map.Entry;
 @SuppressWarnings("rawtypes")
 public class SendPartitionMessagesRequest<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest implements WorkerRequest<I, V, E, M> {
+    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionMessagesRequest.class);
@@ -78,12 +77,12 @@ public class SendPartitionMessagesReques
     int vertexIdMessagesSize = input.readInt();
     vertexIdMessages = Maps.newHashMapWithExpectedSize(vertexIdMessagesSize);
     for (int i = 0; i < vertexIdMessagesSize; ++i) {
-      I vertexId = BspUtils.<I>createVertexId(getConf());
+      I vertexId = getConf().createVertexId();
       vertexId.readFields(input);
       int messageCount = input.readInt();
       List<M> messageList = Lists.newArrayListWithCapacity(messageCount);
       for (int j = 0; j < messageCount; ++j) {
-        M message = BspUtils.<M>createMessageValue(getConf());
+        M message = getConf().createMessageValue();
         message.readFields(input);
         messageList.add(message);
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java Tue Sep 25 17:40:18 2012
@@ -26,7 +26,6 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -45,7 +44,7 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("rawtypes")
 public class SendPartitionMutationsRequest<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest implements WorkerRequest<I, V, E, M> {
+    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionMutationsRequest.class);
@@ -78,7 +77,7 @@ public class SendPartitionMutationsReque
     int vertexIdMutationsSize = input.readInt();
     vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
     for (int i = 0; i < vertexIdMutationsSize; ++i) {
-      I vertexId = BspUtils.<I>createVertexId(getConf());
+      I vertexId = getConf().createVertexId();
       vertexId.readFields(input);
       VertexMutations<I, V, E, M> vertexMutations =
           new VertexMutations<I, V, E, M>();

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java Tue Sep 25 17:40:18 2012
@@ -19,7 +19,6 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -43,7 +42,7 @@ import java.util.Collection;
 @SuppressWarnings("rawtypes")
 public class SendVertexRequest<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest implements WorkerRequest<I, V, E, M> {
+    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendVertexRequest.class);
@@ -75,7 +74,7 @@ public class SendVertexRequest<I extends
     int verticesCount = input.readInt();
     vertices = Lists.newArrayListWithCapacity(verticesCount);
     for (int i = 0; i < verticesCount; ++i) {
-      Vertex<I, V, E, M> vertex = BspUtils.createVertex(getConf());
+      Vertex<I, V, E, M> vertex = getConf().createVertex();
       vertex.readFields(input);
       vertices.add(vertex);
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java Tue Sep 25 17:40:18 2012
@@ -21,16 +21,25 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Interface for requests to implement
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
-public abstract class WritableRequest implements Writable, Configurable {
+public abstract class WritableRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Writable,
+    ImmutableClassesGiraphConfigurable<I, V, E, M> {
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Client id */
   private int clientId = -1;
   /** Request id */
@@ -74,12 +83,13 @@ public abstract class WritableRequest im
   abstract void writeRequest(DataOutput output) throws IOException;
 
   @Override
-  public final Configuration getConf() {
+  public final ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
     return conf;
   }
 
   @Override
-  public final void setConf(Configuration conf) {
+  public final void setConf(ImmutableClassesGiraphConfiguration<I, V,
+      E, M> conf) {
     this.conf = conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.examples;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -59,7 +59,7 @@ public abstract class GeneratedVertexRea
   /** Reverse the id order? */
   protected boolean reverseIdOrder;
   /** Saved configuration */
-  protected Configuration configuration = null;
+  protected ImmutableClassesGiraphConfiguration configuration = null;
 
   /**
    * Default constructor for reflection.
@@ -70,7 +70,8 @@ public abstract class GeneratedVertexRea
   @Override
   public final void initialize(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException {
-    configuration = context.getConfiguration();
+    configuration = new ImmutableClassesGiraphConfiguration(
+        context.getConfiguration());
     totalRecords = configuration.getLong(
         GeneratedVertexReader.READER_VERTICES,
         GeneratedVertexReader.DEFAULT_READER_VERTICES);

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Sep 25 17:40:18 2012
@@ -46,9 +46,7 @@ import org.apache.log4j.Logger;
  * every iteration to verify that checkpoint restarting works.  Fault injection
  * can also test automated checkpoint restarts.
  */
-public class SimpleCheckpointVertex extends
-    EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
-    implements Tool {
+public class SimpleCheckpointVertex implements Tool {
   /** Which superstep to cause the worker to fail */
   public static final int FAULTING_SUPERSTEP = 4;
   /** Vertex id to fault on */
@@ -65,58 +63,64 @@ public class SimpleCheckpointVertex exte
   /** Configuration */
   private Configuration conf;
 
-  @Override
-  public void compute(Iterable<FloatWritable> messages) {
-    SimpleCheckpointVertexWorkerContext workerContext =
-        (SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
-    boolean enableFault = workerContext.getEnableFault();
-    int supersteps = workerContext.getSupersteps();
-
-    if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
-        (getContext().getTaskAttemptID().getId() == 0) &&
-        (getId().get() == FAULTING_VERTEX_ID)) {
-      LOG.info("compute: Forced a fault on the first " +
-          "attempt of superstep " +
-          FAULTING_SUPERSTEP + " and vertex id " +
-          FAULTING_VERTEX_ID);
-      System.exit(-1);
-    }
-    if (getSuperstep() > supersteps) {
-      voteToHalt();
-      return;
-    }
-    long sumAgg = this.<LongWritable>getAggregatedValue(
-        LongSumAggregator.class.getName()).get();
-    LOG.info("compute: " + sumAgg);
-    aggregate(LongSumAggregator.class.getName(),
-        new LongWritable(getId().get()));
-    LOG.info("compute: sum = " + sumAgg +
-        " for vertex " + getId());
-    float msgValue = 0.0f;
-    for (FloatWritable message : messages) {
-      float curMsgValue = message.get();
-      msgValue += curMsgValue;
-      LOG.info("compute: got msgValue = " + curMsgValue +
-          " for vertex " + getId() +
-          " on superstep " + getSuperstep());
-    }
-    int vertexValue = getValue().get();
-    setValue(new IntWritable(vertexValue + (int) msgValue));
-    LOG.info("compute: vertex " + getId() +
-        " has value " + getValue() +
-        " on superstep " + getSuperstep());
-    for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
-      FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
-          (float) vertexValue);
+  /**
+   * Actual computation.
+   */
+  public static class SimpleCheckpointComputation extends
+      EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable> {
+    @Override
+    public void compute(Iterable<FloatWritable> messages) {
+      SimpleCheckpointVertexWorkerContext workerContext =
+          (SimpleCheckpointVertexWorkerContext) getWorkerContext();
+
+      boolean enableFault = workerContext.getEnableFault();
+      int supersteps = workerContext.getSupersteps();
+
+      if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+          (getContext().getTaskAttemptID().getId() == 0) &&
+          (getId().get() == FAULTING_VERTEX_ID)) {
+        LOG.info("compute: Forced a fault on the first " +
+            "attempt of superstep " +
+            FAULTING_SUPERSTEP + " and vertex id " +
+            FAULTING_VERTEX_ID);
+        System.exit(-1);
+      }
+      if (getSuperstep() > supersteps) {
+        voteToHalt();
+        return;
+      }
+      long sumAgg = this.<LongWritable>getAggregatedValue(
+          LongSumAggregator.class.getName()).get();
+      LOG.info("compute: " + sumAgg);
+      aggregate(LongSumAggregator.class.getName(),
+          new LongWritable(getId().get()));
+      LOG.info("compute: sum = " + sumAgg +
+          " for vertex " + getId());
+      float msgValue = 0.0f;
+      for (FloatWritable message : messages) {
+        float curMsgValue = message.get();
+        msgValue += curMsgValue;
+        LOG.info("compute: got msgValue = " + curMsgValue +
+            " for vertex " + getId() +
+            " on superstep " + getSuperstep());
+      }
+      int vertexValue = getValue().get();
+      setValue(new IntWritable(vertexValue + (int) msgValue));
       LOG.info("compute: vertex " + getId() +
-          " sending edgeValue " + edge.getValue() +
-          " vertexValue " + vertexValue +
-          " total " + newEdgeValue +
-              " to vertex " + edge.getTargetVertexId() +
-              " on superstep " + getSuperstep());
-      addEdge(edge.getTargetVertexId(), newEdgeValue);
-      sendMessage(edge.getTargetVertexId(), newEdgeValue);
+          " has value " + getValue() +
+          " on superstep " + getSuperstep());
+      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+        FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+            (float) vertexValue);
+        LOG.info("compute: vertex " + getId() +
+            " sending edgeValue " + edge.getValue() +
+            " vertexValue " + vertexValue +
+            " total " + newEdgeValue +
+            " to vertex " + edge.getTargetVertexId() +
+            " on superstep " + getSuperstep());
+        addEdge(edge.getTargetVertexId(), newEdgeValue);
+        sendMessage(edge.getTargetVertexId(), newEdgeValue);
+      }
     }
   }
 
@@ -212,14 +216,19 @@ public class SimpleCheckpointVertex exte
     }
 
     GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
-    bspJob.setVertexClass(getClass());
-    bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
-    bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
-    bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
+    bspJob.getConfiguration().setVertexClass(SimpleCheckpointComputation.class);
+    bspJob.getConfiguration().setVertexInputFormatClass(
+        GeneratedVertexInputFormat.class);
+    bspJob.getConfiguration().setVertexOutputFormatClass(
+        IdWithValueTextOutputFormat.class);
+    bspJob.getConfiguration().setWorkerContextClass(
+        SimpleCheckpointVertexWorkerContext.class);
+    bspJob.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertexMasterCompute.class);
     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
-    bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+    bspJob.getConfiguration().setWorkerConfiguration(
+        minWorkers, maxWorkers, 100.0f);
 
     FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
                                    new Path(cmd.getOptionValue('o')));

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Tue Sep 25 17:40:18 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.examples;
 import org.apache.giraph.aggregators.DoubleMaxAggregator;
 import org.apache.giraph.aggregators.DoubleMinAggregator;
 import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 import org.apache.giraph.graph.Vertex;
@@ -183,10 +182,9 @@ public class SimplePageRankVertex extend
 
     @Override
     public Vertex<LongWritable, DoubleWritable,
-        FloatWritable, DoubleWritable>
-    getCurrentVertex() throws IOException {
+        FloatWritable, DoubleWritable> getCurrentVertex() throws IOException {
       Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-      vertex = BspUtils.createVertex(configuration);
+          vertex = configuration.createVertex();
 
       LongWritable vertexId = new LongWritable(
           (inputSplit.getSplitIndex() * totalRecords) + recordsRead);

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
@@ -70,8 +69,7 @@ public class SimpleSuperstepVertex exten
         IntWritable> getCurrentVertex()
       throws IOException, InterruptedException {
       Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
-        BspUtils.<LongWritable, IntWritable, FloatWritable,
-        IntWritable>createVertex(configuration);
+          configuration.createVertex();
       long tmpId = reverseIdOrder ?
           ((inputSplit.getSplitIndex() + 1) * totalRecords) -
           recordsRead - 1 :

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Tue Sep 25 17:40:18 2012
@@ -42,26 +42,43 @@ import java.io.IOException;
  * emit worker data to HDFS during a graph
  * computation.
  */
-public class SimpleVertexWithWorkerContext extends
-    EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
-    implements Tool {
+public class SimpleVertexWithWorkerContext implements Tool {
   /** Directory name of where to write. */
   public static final String OUTPUTDIR = "svwwc.outputdir";
   /** Halting condition for the number of supersteps */
   private static final int TESTLENGTH = 30;
+  /** Configuration */
+  private Configuration conf;
 
   @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 
-    long superstep = getSuperstep();
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 
-    if (superstep < TESTLENGTH) {
-      EmitterWorkerContext emitter =
-          (EmitterWorkerContext) getWorkerContext();
-      emitter.emit("vertexId=" + getId() +
-          " superstep=" + superstep + "\n");
-    } else {
-      voteToHalt();
+  /**
+   * Actual vetex implementation
+   */
+  public static class SimpleVertex extends
+      EdgeListVertex<LongWritable, IntWritable, FloatWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+
+      long superstep = getSuperstep();
+
+      if (superstep < TESTLENGTH) {
+        EmitterWorkerContext emitter =
+            (EmitterWorkerContext) getWorkerContext();
+        emitter.emit("vertexId=" + getId() +
+            " superstep=" + superstep + "\n");
+      } else {
+        voteToHalt();
+      }
     }
   }
 
@@ -152,13 +169,13 @@ public class SimpleVertexWithWorkerConte
           "run: Must have 2 arguments <output path> <# of workers>");
     }
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-    job.setVertexClass(getClass());
-    job.setVertexInputFormatClass(
+    job.getConfiguration().setVertexClass(SimpleVertex.class);
+    job.getConfiguration().setVertexInputFormatClass(
         SimpleSuperstepVertexInputFormat.class);
-    job.setWorkerContextClass(EmitterWorkerContext.class);
-    Configuration conf = job.getConfiguration();
-    conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
-    job.setWorkerConfiguration(Integer.parseInt(args[1]),
+    job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
+    job.getConfiguration().set(
+        SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+    job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
         Integer.parseInt(args[1]),
         100.0f);
     if (job.run(true)) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Sep 25 17:40:18 2012
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedService;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperManager;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -239,8 +240,8 @@ public abstract class BspService<I exten
   /** Registered list of BspEvents */
   private final List<BspEvent> registeredBspEvents =
       new ArrayList<BspEvent>();
-  /** Configuration of the job*/
-  private final Configuration conf;
+  /** Immutable configuration of the job*/
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Job context (mainly for progress) */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Cached superstep (from ZooKeeper) */
@@ -305,10 +306,12 @@ public abstract class BspService<I exten
 
     this.context = context;
     this.graphMapper = graphMapper;
-    this.conf = context.getConfiguration();
+    this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
     this.jobId = conf.get("mapred.job.id", "Unknown Job");
     this.taskPartition = conf.getInt("mapred.task.partition", -1);
-    this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP,
+    this.restartedSuperstep = conf.getLong(
+        GiraphConfiguration.RESTART_SUPERSTEP,
         UNSET_SUPERSTEP);
     this.cachedSuperstep = restartedSuperstep;
     if ((restartedSuperstep != UNSET_SUPERSTEP) &&
@@ -323,12 +326,11 @@ public abstract class BspService<I exten
       throw new RuntimeException(e);
     }
     this.hostnamePartitionId = hostname + "_" + getTaskPartition();
-    this.graphPartitionerFactory =
-        BspUtils.<I, V, E, M>createGraphPartitioner(conf);
+    this.graphPartitionerFactory = conf.createGraphPartitioner();
 
     this.checkpointFrequency =
-        conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
-            GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
+        conf.getInt(GiraphConfiguration.CHECKPOINT_FREQUENCY,
+            GiraphConfiguration.CHECKPOINT_FREQUENCY_DEFAULT);
 
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
@@ -338,10 +340,9 @@ public abstract class BspService<I exten
     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
-    checkpointBasePath =
-        getConfiguration().get(
-            GiraphJob.CHECKPOINT_DIRECTORY,
-            GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
+    checkpointBasePath = getConfiguration().get(
+        GiraphConfiguration.CHECKPOINT_DIRECTORY,
+        GiraphConfiguration.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
     currentMasterPath = basePath + CURRENT_MASTER_INFO_NODE;
     if (LOG.isInfoEnabled()) {
@@ -621,7 +622,8 @@ public abstract class BspService<I exten
     return fs;
   }
 
-  public final Configuration getConfiguration() {
+  public final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  getConfiguration() {
     return conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Sep 25 17:40:18 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph;
 
 import com.google.common.collect.Sets;
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -168,21 +169,21 @@ public class BspServiceMaster<I extends 
     registerBspEvent(superstepStateChanged);
 
     maxWorkers =
-        getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
+        getConfiguration().getInt(GiraphConfiguration.MAX_WORKERS, -1);
     minWorkers =
-        getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
+        getConfiguration().getInt(GiraphConfiguration.MIN_WORKERS, -1);
     minPercentResponded =
-        getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
+        getConfiguration().getFloat(GiraphConfiguration.MIN_PERCENT_RESPONDED,
             100.0f);
     msecsPollPeriod =
-        getConfiguration().getInt(GiraphJob.POLL_MSECS,
-            GiraphJob.POLL_MSECS_DEFAULT);
+        getConfiguration().getInt(GiraphConfiguration.POLL_MSECS,
+            GiraphConfiguration.POLL_MSECS_DEFAULT);
     maxPollAttempts =
-        getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
-            GiraphJob.POLL_ATTEMPTS_DEFAULT);
+        getConfiguration().getInt(GiraphConfiguration.POLL_ATTEMPTS,
+            GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
     partitionLongTailMinPrint = getConfiguration().getInt(
-        GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
-        GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+        GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT,
+        GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
     masterGraphPartitioner =
         getGraphPartitionerFactory().createMasterGraphPartitioner();
   }
@@ -242,15 +243,16 @@ public class BspServiceMaster<I extends 
    */
   private List<InputSplit> generateInputSplits(int numWorkers) {
     VertexInputFormat<I, V, E, M> vertexInputFormat =
-        BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+        getConfiguration().createVertexInputFormat();
     List<InputSplit> splits;
     try {
       splits = vertexInputFormat.getSplits(getContext(), numWorkers);
       float samplePercent =
           getConfiguration().getFloat(
-              GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
-              GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
-      if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+              GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
+              GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+      if (samplePercent !=
+          GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
         int lastIndex = (int) (samplePercent * splits.size() / 100f);
         List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
         LOG.warn("generateInputSplits: Using sampling - Processing " +
@@ -285,7 +287,7 @@ public class BspServiceMaster<I extends 
       org.apache.hadoop.mapred.JobClient jobClient =
           new org.apache.hadoop.mapred.JobClient(
               (org.apache.hadoop.mapred.JobConf)
-              getConfiguration());
+              getContext().getConfiguration());
       @SuppressWarnings("deprecation")
       org.apache.hadoop.mapred.JobID jobId =
           org.apache.hadoop.mapred.JobID.forName(getJobId());
@@ -785,10 +787,8 @@ public class BspServiceMaster<I extends 
           currentMasterTaskPartitionCounter.increment(
               getTaskPartition() -
               currentMasterTaskPartitionCounter.getValue());
-          masterCompute =
-              BspUtils.createMasterCompute(getConfiguration());
-          aggregatorWriter =
-              BspUtils.createAggregatorWriter(getConfiguration());
+          masterCompute = getConfiguration().createMasterCompute();
+          aggregatorWriter = getConfiguration().createAggregatorWriter();
           try {
             aggregatorWriter.initialize(getContext(),
                 getApplicationAttempt());
@@ -797,10 +797,9 @@ public class BspServiceMaster<I extends 
                 "Couldn't initialize aggregatorWriter", e);
           }
 
-          boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-              GiraphJob.USE_NETTY_DEFAULT);
-          if (useNetty) {
-            commService = new NettyMasterClientServer(getContext());
+          if (getConfiguration().getUseNetty()) {
+            commService = new NettyMasterClientServer(
+                getContext(), getConfiguration());
             masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
                 commService.getMyAddress().getPort());
             // write my address to znode so workers could read it
@@ -1415,8 +1414,8 @@ public class BspServiceMaster<I extends 
   private void cleanUpOldSuperstep(long removeableSuperstep) throws
       InterruptedException {
     if (!(getConfiguration().getBoolean(
-        GiraphJob.KEEP_ZOOKEEPER_DATA,
-        GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+        GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
+        GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
         (removeableSuperstep >= 0)) {
       String oldSuperstepPath =
           getSuperstepPath(getApplicationAttempt()) + "/" +
@@ -1470,9 +1469,7 @@ public class BspServiceMaster<I extends 
       }
     }
 
-    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT);
-    if (useNetty) {
+    if (getConfiguration().getUseNetty()) {
       commService.fixWorkerAddresses(chosenWorkerInfoList);
     }
 
@@ -1702,8 +1699,8 @@ public class BspServiceMaster<I extends 
     // and the master can do any final cleanup
     try {
       if (!getConfiguration().getBoolean(
-          GiraphJob.KEEP_ZOOKEEPER_DATA,
-          GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+          GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
+          GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
         if (LOG.isInfoEnabled()) {
           LOG.info("cleanupZooKeeper: Removing the following path " +
               "and all children - " + basePath);
@@ -1753,8 +1750,8 @@ public class BspServiceMaster<I extends 
       cleanUpZooKeeper();
       // If desired, cleanup the checkpoint directory
       if (getConfiguration().getBoolean(
-          GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
-          GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+          GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
+          GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
         boolean success =
             getFs().delete(new Path(checkpointBasePath), true);
         if (LOG.isInfoEnabled()) {
@@ -1765,9 +1762,7 @@ public class BspServiceMaster<I extends 
       }
       aggregatorWriter.close();
 
-      boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-          GiraphJob.USE_NETTY_DEFAULT);
-      if (useNetty) {
+      if (getConfiguration().getUseNetty()) {
         commService.closeConnections();
         commService.close();
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.RPCCommunications;
@@ -148,17 +149,19 @@ public class BspServiceWorker<I extends 
         new GiraphTransferRegulator(getConfiguration());
     inputSplitMaxVertices =
         getConfiguration().getLong(
-            GiraphJob.INPUT_SPLIT_MAX_VERTICES,
-            GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+            GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES,
+            GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT);
+    boolean useNetty = getConfiguration().getUseNetty();
     if (useNetty) {
-      commService =  new NettyWorkerClientServer<I, V, E, M>(context, this);
+      commService =  new NettyWorkerClientServer<I, V, E, M>(
+          context, getConfiguration(), this);
     } else {
       commService =
-          new RPCCommunications<I, V, E, M>(context, this, graphState);
+          new RPCCommunications<I, V, E, M>(context, this,
+              getConfiguration(),
+              graphState);
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
@@ -173,8 +176,7 @@ public class BspServiceWorker<I extends 
 
     graphState.setWorkerCommunications(commService);
     this.workerContext =
-        BspUtils.createWorkerContext(getConfiguration(),
-            graphMapper.getGraphState());
+        getConfiguration().createWorkerContext(graphMapper.getGraphState());
 
     if (useNetty) {
       workerPartitionStore = null;
@@ -445,7 +447,7 @@ public class BspServiceWorker<I extends 
   private VertexEdgeCount readVerticesFromInputSplit(
       InputSplit inputSplit) throws IOException, InterruptedException {
     VertexInputFormat<I, V, E, M> vertexInputFormat =
-        BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+        getConfiguration().createVertexInputFormat();
     VertexReader<I, V, E, M> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, getContext());
     vertexReader.initialize(inputSplit, getContext());
@@ -459,8 +461,7 @@ public class BspServiceWorker<I extends 
                 "without an id!  - " + readerVertex);
       }
       if (readerVertex.getValue() == null) {
-        readerVertex.setValue(
-            BspUtils.<V>createVertexValue(getConfiguration()));
+        readerVertex.setValue(getConfiguration().createVertexValue());
       }
       PartitionOwner partitionOwner =
           workerGraphPartitioner.getPartitionOwner(
@@ -947,9 +948,7 @@ public class BspServiceWorker<I extends 
     }
 
 
-    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT);
-    if (useNetty) {
+    if (getConfiguration().getUseNetty()) {
       // get address of master
       WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
           null, masterInfo);
@@ -1097,15 +1096,15 @@ public class BspServiceWorker<I extends 
    * @throws InterruptedException
    */
   private void saveVertices() throws IOException, InterruptedException {
-    if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS) ==
-        null) {
-      LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
+    if (getConfiguration().getVertexOutputFormatClass() == null) {
+      LOG.warn("saveVertices: " +
+          GiraphConfiguration.VERTEX_OUTPUT_FORMAT_CLASS +
           " not specified -- there will be no saved output");
       return;
     }
 
     VertexOutputFormat<I, V, E> vertexOutputFormat =
-        BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
+        getConfiguration().createVertexOutputFormat();
     VertexWriter<I, V, E> vertexWriter =
         vertexOutputFormat.createVertexWriter(getContext());
     vertexWriter.initialize(getContext());
@@ -1203,8 +1202,7 @@ public class BspServiceWorker<I extends 
       LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
     }
 
-    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT);
+    boolean useNetty = getConfiguration().getUseNetty();
     FSDataOutputStream verticesOutputStream =
         getFs().create(verticesFilePath);
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
@@ -1272,8 +1270,8 @@ public class BspServiceWorker<I extends 
 
   @Override
   public void loadCheckpoint(long superstep) {
-    if (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
-        GiraphJob.USE_NETTY_DEFAULT)) {
+    if (getConfiguration().getBoolean(GiraphConfiguration.USE_NETTY,
+        GiraphConfiguration.USE_NETTY_DEFAULT)) {
       try {
         // clear old message stores
         getServerData().getIncomingMessageStore().clearAll();

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.giraph.graph.partition.HashPartitionerFactory;
 import org.apache.giraph.graph.partition.PartitionStats;
@@ -53,7 +54,7 @@ public class BspUtils {
   Class<? extends GraphPartitionerFactory<I, V, E, M>>
   getGraphPartitionerClass(Configuration conf) {
     return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
-      conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+      conf.getClass(GiraphConfiguration.GRAPH_PARTITIONER_FACTORY_CLASS,
         HashPartitionerFactory.class,
         GraphPartitionerFactory.class);
   }
@@ -116,7 +117,7 @@ public class BspUtils {
   Class<? extends VertexInputFormat<I, V, E, M>>
   getVertexInputFormatClass(Configuration conf) {
     return (Class<? extends VertexInputFormat<I, V, E, M>>)
-      conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
+      conf.getClass(GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS,
         null,
         VertexInputFormat.class);
   }
@@ -160,7 +161,7 @@ public class BspUtils {
   Class<? extends VertexOutputFormat<I, V, E>>
   getVertexOutputFormatClass(Configuration conf) {
     return (Class<? extends VertexOutputFormat<I, V, E>>)
-      conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
+      conf.getClass(GiraphConfiguration.VERTEX_OUTPUT_FORMAT_CLASS,
         null,
         VertexOutputFormat.class);
   }
@@ -191,7 +192,7 @@ public class BspUtils {
    */
   public static Class<? extends AggregatorWriter>
   getAggregatorWriterClass(Configuration conf) {
-    return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+    return conf.getClass(GiraphConfiguration.AGGREGATOR_WRITER_CLASS,
       TextAggregatorWriter.class,
       AggregatorWriter.class);
   }
@@ -221,7 +222,7 @@ public class BspUtils {
   Class<? extends VertexCombiner<I, M>>
   getVertexCombinerClass(Configuration conf) {
     return (Class<? extends VertexCombiner<I, M>>)
-      conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
+      conf.getClass(GiraphConfiguration.VERTEX_COMBINER_CLASS,
         null,
         VertexCombiner.class);
   }
@@ -259,7 +260,7 @@ public class BspUtils {
   Class<? extends VertexResolver<I, V, E, M>>
   getVertexResolverClass(Configuration conf) {
     return (Class<? extends VertexResolver<I, V, E, M>>)
-      conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
+      conf.getClass(GiraphConfiguration.VERTEX_RESOLVER_CLASS,
         VertexResolver.class,
         VertexResolver.class);
   }
@@ -297,7 +298,7 @@ public class BspUtils {
   public static Class<? extends WorkerContext>
   getWorkerContextClass(Configuration conf) {
     return (Class<? extends WorkerContext>)
-      conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+      conf.getClass(GiraphConfiguration.WORKER_CONTEXT_CLASS,
         DefaultWorkerContext.class,
         WorkerContext.class);
   }
@@ -335,7 +336,7 @@ public class BspUtils {
   public static Class<? extends MasterCompute>
   getMasterComputeClass(Configuration conf) {
     return (Class<? extends MasterCompute>)
-      conf.getClass(GiraphJob.MASTER_COMPUTE_CLASS,
+      conf.getClass(GiraphConfiguration.MASTER_COMPUTE_CLASS,
         DefaultMasterCompute.class,
         MasterCompute.class);
   }
@@ -370,7 +371,7 @@ public class BspUtils {
   E extends Writable, M extends Writable>
   Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
     return (Class<? extends Vertex<I, V, E, M>>)
-      conf.getClass(GiraphJob.VERTEX_CLASS,
+      conf.getClass(GiraphConfiguration.VERTEX_CLASS,
         null,
         Vertex.class);
   }
@@ -405,7 +406,7 @@ public class BspUtils {
   @SuppressWarnings("unchecked")
   public static <I extends Writable> Class<I>
   getVertexIdClass(Configuration conf) {
-    return (Class<I>) conf.getClass(GiraphJob.VERTEX_ID_CLASS,
+    return (Class<I>) conf.getClass(GiraphConfiguration.VERTEX_ID_CLASS,
       WritableComparable.class);
   }
 
@@ -441,7 +442,7 @@ public class BspUtils {
   @SuppressWarnings("unchecked")
   public static <V extends Writable> Class<V>
   getVertexValueClass(Configuration conf) {
-    return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
+    return (Class<V>) conf.getClass(GiraphConfiguration.VERTEX_VALUE_CLASS,
       Writable.class);
   }
 
@@ -481,7 +482,7 @@ public class BspUtils {
   @SuppressWarnings("unchecked")
   public static <E extends Writable> Class<E>
   getEdgeValueClass(Configuration conf) {
-    return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
+    return (Class<E>) conf.getClass(GiraphConfiguration.EDGE_VALUE_CLASS,
       Writable.class);
   }
 
@@ -521,7 +522,7 @@ public class BspUtils {
   @SuppressWarnings("unchecked")
   public static <M extends Writable> Class<M>
   getMessageValueClass(Configuration conf) {
-    return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
+    return (Class<M>) conf.getClass(GiraphConfiguration.MESSAGE_VALUE_CLASS,
       Writable.class);
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Tue Sep 25 17:40:18 2012
@@ -121,18 +121,18 @@ public abstract class EdgeListVertex<I e
 
   @Override
   public final void readFields(DataInput in) throws IOException {
-    I vertexId = BspUtils.<I>createVertexId(getConf());
+    I vertexId = getConf().createVertexId();
     vertexId.readFields(in);
-    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    V vertexValue = getConf().createVertexValue();
     vertexValue.readFields(in);
     super.initialize(vertexId, vertexValue);
 
     int numEdges = in.readInt();
     edgeList = Lists.newArrayListWithCapacity(numEdges);
     for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      I targetVertexId = getConf().createVertexId();
       targetVertexId.readFields(in);
-      E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+      E edgeValue = getConf().createEdgeValue();
       edgeValue.readFields(in);
       edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
     }
@@ -140,7 +140,7 @@ public abstract class EdgeListVertex<I e
     int numMessages = in.readInt();
     messageList = Lists.newArrayListWithCapacity(numMessages);
     for (int i = 0; i < numMessages; ++i) {
-      M message = BspUtils.<M>createMessageValue(getConf());
+      M message = getConf().createMessageValue();
       message.readFields(in);
       messageList.add(message);
     }