You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC

svn commit: r1390014 [3/4] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/...

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspOutputFormat;
-import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.mapreduce.Job;
@@ -38,459 +39,38 @@ public class GiraphJob {
     Configuration.addDefaultResource("giraph-site.xml");
   }
 
-  /** Vertex class - required */
-  public static final String VERTEX_CLASS = "giraph.vertexClass";
-  /** VertexInputFormat class - required */
-  public static final String VERTEX_INPUT_FORMAT_CLASS =
-      "giraph.vertexInputFormatClass";
-
-  /** Class for Master - optional */
-  public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
-
-  /** VertexOutputFormat class - optional */
-  public static final String VERTEX_OUTPUT_FORMAT_CLASS =
-      "giraph.vertexOutputFormatClass";
-  /** Vertex combiner class - optional */
-  public static final String VERTEX_COMBINER_CLASS =
-      "giraph.combinerClass";
-  /** Vertex resolver class - optional */
-  public static final String VERTEX_RESOLVER_CLASS =
-      "giraph.vertexResolverClass";
-  /** Graph partitioner factory class - optional */
-  public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
-      "giraph.graphPartitionerFactoryClass";
-
-  /** Vertex index class */
-  public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
-  /** Vertex value class */
-  public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
-  /** Edge value class */
-  public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
-  /** Message value class */
-  public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
-  /** Worker context class */
-  public static final String WORKER_CONTEXT_CLASS =
-      "giraph.workerContextClass";
-  /** AggregatorWriter class - optional */
-  public static final String AGGREGATOR_WRITER_CLASS =
-      "giraph.aggregatorWriterClass";
-
-  /**
-   * Minimum number of simultaneous workers before this job can run (int)
-   */
-  public static final String MIN_WORKERS = "giraph.minWorkers";
-  /**
-   * Maximum number of simultaneous worker tasks started by this job (int).
-   */
-  public static final String MAX_WORKERS = "giraph.maxWorkers";
-
-  /**
-   * Separate the workers and the master tasks.  This is required
-   * to support dynamic recovery. (boolean)
-   */
-  public static final String SPLIT_MASTER_WORKER =
-      "giraph.SplitMasterWorker";
-  /**
-   * Default on whether to separate the workers and the master tasks.
-   * Needs to be "true" to support dynamic recovery.
-   */
-  public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
-
-  /** Indicates whether this job is run in an internal unit test */
-  public static final String LOCAL_TEST_MODE =
-      "giraph.localTestMode";
-
-  /** not in local test mode per default */
-  public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
-
-  /** Override the Hadoop log level and set the desired log level. */
-  public static final String LOG_LEVEL = "giraph.logLevel";
-  /** Default log level is INFO (same as Hadoop) */
-  public static final String LOG_LEVEL_DEFAULT = "info";
-
-  /**
-   * Minimum percent of the maximum number of workers that have responded
-   * in order to continue progressing. (float)
-   */
-  public static final String MIN_PERCENT_RESPONDED =
-      "giraph.minPercentResponded";
-  /** Default 100% response rate for workers */
-  public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
-
-  /** Polling timeout to check on the number of responded tasks (int) */
-  public static final String POLL_MSECS = "giraph.pollMsecs";
-  /** Default poll msecs (30 seconds) */
-  public static final int POLL_MSECS_DEFAULT = 30 * 1000;
-
-  /**
-   *  ZooKeeper comma-separated list (if not set,
-   *  will start up ZooKeeper locally)
-   */
-  public static final String ZOOKEEPER_LIST = "giraph.zkList";
-
-  /** ZooKeeper session millisecond timeout */
-  public static final String ZOOKEEPER_SESSION_TIMEOUT =
-      "giraph.zkSessionMsecTimeout";
-  /** Default Zookeeper session millisecond timeout */
-  public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
-
-  /** Polling interval to check for the final ZooKeeper server data */
-  public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS =
-      "giraph.zkServerlistPollMsecs";
-  /** Default polling interval to check for the final ZooKeeper server data */
-  public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT =
-      3 * 1000;
-
-  /** Number of nodes (not tasks) to run Zookeeper on */
-  public static final String ZOOKEEPER_SERVER_COUNT =
-      "giraph.zkServerCount";
-  /** Default number of nodes to run Zookeeper on */
-  public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
-
-  /** ZooKeeper port to use */
-  public static final String ZOOKEEPER_SERVER_PORT =
-      "giraph.zkServerPort";
-  /** Default ZooKeeper port to use */
-  public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
-
-  /** Location of the ZooKeeper jar - Used internally, not meant for users */
-  public static final String ZOOKEEPER_JAR = "giraph.zkJar";
-
-  /** Local ZooKeeper directory to use */
-  public static final String ZOOKEEPER_DIR = "giraph.zkDir";
-
-  /** Use the RPC communication or netty communication */
-  public static final String USE_NETTY = "giraph.useNetty";
-  /** Default is to use RPC, not netty */
-  public static final boolean USE_NETTY_DEFAULT = false;
-
-  /** TCP backlog (defaults to number of workers) */
-  public static final String TCP_BACKLOG = "giraph.tcpBacklog";
-  /**
-   * Default TCP backlog default if the number of workers is not specified
-   * (i.e unittests)
-   */
-  public static final int TCP_BACKLOG_DEFAULT = 1;
-
-  /** Netty simulate a first request closed */
-  public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
-      "giraph.nettySimulateFirstRequestClosed";
-  /** Default of not simulating failure for first request */
-  public static final boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT =
-      false;
-
-  /** Netty simulate a first response failed */
-  public static final String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
-      "giraph.nettySimulateFirstResponseFailed";
-  /** Default of not simulating failure for first reponse */
-  public static final boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT =
-      false;
-
-  /** Maximum number of reconnection attempts */
-  public static final String MAX_RECONNECT_ATTEMPTS =
-      "giraph.maxNumberOfOpenRequests";
-  /** Default maximum number of reconnection attempts */
-  public static final int MAX_RECONNECT_ATTEMPTS_DEFAULT = 10;
-
-  /** Max resolve address attempts */
-  public static final String MAX_RESOLVE_ADDRESS_ATTEMPTS =
-      "giraph.maxResolveAddressAttempts";
-  /** Default max resolve address attempts */
-  public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
-
-  /** Msecs to wait between waiting for all requests to finish */
-  public static final String WAITING_REQUEST_MSECS =
-      "giraph.waitingRequestMsecs";
-  /** Default msecs to wait between waiting for all requests to finish */
-  public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
-
-  /** Milliseconds for a request to complete (or else resend) */
-  public static final String MAX_REQUEST_MILLISECONDS =
-      "giraph.maxRequestMilliseconds";
-  /** Maximum number of milliseconds for a request to complete (10 minutes) */
-  public static final int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
-
-  /** Netty max connection failures */
-  public static final String NETTY_MAX_CONNECTION_FAILURES =
-      "giraph.nettyMaxConnectionFailures";
-  /** Default Netty max connection failures */
-  public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
-
-  /** Initial port to start using for the RPC communication */
-  public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
-  /** Default port to start using for the RPC communication */
-  public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
-
-  /** Maximum bind attempts for different RPC ports */
-  public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
-      "giraph.maxRpcPortBindAttempts";
-  /** Default maximum bind attempts for different RPC ports */
-  public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
-  /**
-   * Fail first RPC port binding attempt, simulate binding failure
-   * on real grid testing
-   */
-  public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
-      "giraph.failFirstRpcPortBindAttempt";
-  /** Default fail first RPC port binding attempt flag */
-  public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
-
-  /** Maximum number of RPC handlers */
-  public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
-  /** Default maximum number of RPC handlers */
-  public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
-
-  /** Client send buffer size */
-  public static final String CLIENT_SEND_BUFFER_SIZE =
-      "giraph.clientSendBufferSize";
-  /** Default client send buffer size of 0.5 MB */
-  public static final int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
-
-  /** Client receive buffer size */
-  public static final String CLIENT_RECEIVE_BUFFER_SIZE =
-      "giraph.clientReceiveBufferSize";
-  /** Default client receive buffer size of 32 k */
-  public static final int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
-
-  /** Server send buffer size */
-  public static final String SERVER_SEND_BUFFER_SIZE =
-      "giraph.serverSendBufferSize";
-  /** Default server send buffer size of 32 k */
-  public static final int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
-
-  /** Server receive buffer size */
-  public static final String SERVER_RECEIVE_BUFFER_SIZE =
-      "giraph.serverReceiveBufferSize";
-  /** Default server receive buffer size of 0.5 MB */
-  public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
-
-  /** Maximum number of messages per peer before flush */
-  public static final String MSG_SIZE = "giraph.msgSize";
-  /** Default maximum number of messages per peer before flush */
-  public static final int MSG_SIZE_DEFAULT = 2000;
-
-  /** Maximum number of mutations per partition before flush */
-  public static final String MAX_MUTATIONS_PER_REQUEST =
-      "giraph.maxMutationsPerRequest";
-  /** Default maximum number of mutations per partition before flush */
-  public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
-
-  /** Maximum number of messages that can be bulk sent during a flush */
-  public static final String MAX_MESSAGES_PER_FLUSH_PUT =
-      "giraph.maxMessagesPerFlushPut";
-  /** Default number of messages that can be bulk sent during a flush */
-  public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
-
-  /** Number of channels used per server */
-  public static final String CHANNELS_PER_SERVER =
-      "giraph.channelsPerServer";
-  /** Default number of channels used per server of 1 */
-  public static final int DEFAULT_CHANNELS_PER_SERVER = 1;
-
-  /** Number of flush threads per peer */
-  public static final String MSG_NUM_FLUSH_THREADS =
-      "giraph.msgNumFlushThreads";
-
-  /** Number of poll attempts prior to failing the job (int) */
-  public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
-  /** Default poll attempts */
-  public static final int POLL_ATTEMPTS_DEFAULT = 10;
-
-  /** Number of minimum vertices in each vertex range */
-  public static final String MIN_VERTICES_PER_RANGE =
-      "giraph.minVerticesPerRange";
-  /** Default number of minimum vertices in each vertex range */
-  public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3;
-
-  /** Minimum stragglers of the superstep before printing them out */
-  public static final String PARTITION_LONG_TAIL_MIN_PRINT =
-      "giraph.partitionLongTailMinPrint";
-  /** Only print stragglers with one as a default */
-  public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
-
-  /** Use superstep counters? (boolean) */
-  public static final String USE_SUPERSTEP_COUNTERS =
-      "giraph.useSuperstepCounters";
-  /** Default is to use the superstep counters */
-  public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
-  /**
-   * Set the multiplicative factor of how many partitions to create from
-   * a single InputSplit based on the number of total InputSplits.  For
-   * example, if there are 10 total InputSplits and this is set to 0.5, then
-   * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
-   * minimum size is met).
-   */
-  public static final String TOTAL_INPUT_SPLIT_MULTIPLIER =
-      "giraph.totalInputSplitMultiplier";
-  /** Default total input split multiplier */
-  public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f;
-
-  /**
-   * Input split sample percent - Used only for sampling and testing, rather
-   * than an actual job.  The idea is that to test, you might only want a
-   * fraction of the actual input splits from your VertexInputFormat to
-   * load (values should be [0, 100]).
-   */
-  public static final String INPUT_SPLIT_SAMPLE_PERCENT =
-      "giraph.inputSplitSamplePercent";
-  /** Default is to use all the input splits */
-  public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
-
-  /**
-   * To limit outlier input splits from producing too many vertices or to
-   * help with testing, the number of vertices loaded from an input split can
-   * be limited.  By default, everything is loaded.
-   */
-  public static final String INPUT_SPLIT_MAX_VERTICES =
-      "giraph.InputSplitMaxVertices";
-  /**
-   * Default is that all the vertices are to be loaded from the input
-   * split
-   */
-  public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
-
-  /** Java opts passed to ZooKeeper startup */
-  public static final String ZOOKEEPER_JAVA_OPTS =
-      "giraph.zkJavaOpts";
-  /** Default java opts passed to ZooKeeper startup */
-  public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
-      "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
-          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
-
-  /**
-   *  How often to checkpoint (i.e. 0, means no checkpoint,
-   *  1 means every superstep, 2 is every two supersteps, etc.).
-   */
-  public static final String CHECKPOINT_FREQUENCY =
-      "giraph.checkpointFrequency";
-
-  /** Default checkpointing frequency of none. */
-  public static final int CHECKPOINT_FREQUENCY_DEFAULT = 0;
-
-  /**
-   * Delete checkpoints after a successful job run?
-   */
-  public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
-      "giraph.cleanupCheckpointsAfterSuccess";
-  /** Default is to clean up the checkponts after a successful job */
-  public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT =
-      true;
-
-  /**
-   * An application can be restarted manually by selecting a superstep.  The
-   * corresponding checkpoint must exist for this to work.  The user should
-   * set a long value.  Default is start from scratch.
-   */
-  public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep";
-
-  /**
-   * Base ZNode for Giraph's state in the ZooKeeper cluster.  Must be a root
-   * znode on the cluster beginning with "/"
-   */
-  public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
-
-  /**
-   * If ZOOKEEPER_LIST is not set, then use this directory to manage
-   * ZooKeeper
-   */
-  public static final String ZOOKEEPER_MANAGER_DIRECTORY =
-      "giraph.zkManagerDirectory";
-  /**
-   * Default ZooKeeper manager directory (where determining the servers in
-   * HDFS files will go).  Final directory path will also have job number
-   * for uniqueness.
-   */
-  public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
-      "_bsp/_defaultZkManagerDir";
-
-  /** This directory has/stores the available checkpoint files in HDFS. */
-  public static final String CHECKPOINT_DIRECTORY =
-      "giraph.checkpointDirectory";
-  /**
-   * Default checkpoint directory (where checkpoing files go in HDFS).  Final
-   * directory path will also have the job number for uniqueness
-   */
-  public static final String CHECKPOINT_DIRECTORY_DEFAULT =
-      "_bsp/_checkpoints/";
-
-  /** Directory in the local file system for out-of-core messages. */
-  public static final String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
-  /**
-   * Default messages directory. Final directory path will also have the
-   * job number for uniqueness
-   */
-  public static final String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
-
-  /** Whether or not to use out-of-core messages */
-  public static final String USE_OUT_OF_CORE_MESSAGES =
-      "giraph.useOutOfCoreMessages";
-  /** Default choice about using out-of-core messaging */
-  public static final boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
-  /**
-   * If using out-of-core messaging, it tells how much messages do we keep
-   * in memory.
-   */
-  public static final String MAX_MESSAGES_IN_MEMORY =
-      "giraph.maxMessagesInMemory";
-  /** Default maximum number of messages in memory. */
-  public static final int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
-  /** Size of buffer when reading and writing messages out-of-core. */
-  public static final String MESSAGES_BUFFER_SIZE =
-      "giraph.messagesBufferSize";
-  /** Default size of buffer when reading and writing messages out-of-core. */
-  public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
-
-  /** Directory in the local filesystem for out-of-core partitions. */
-  public static final String PARTITIONS_DIRECTORY =
-      "giraph.partitionsDirectory";
-  /** Default directory for out-of-core partitions. */
-  public static final String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
-
-  /** Enable out-of-core graph. */
-  public static final String USE_OUT_OF_CORE_GRAPH =
-      "giraph.useOutOfCoreGraph";
-  /** Default is not to use out-of-core graph. */
-  public static final boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
-
-  /** Maximum number of partitions to hold in memory for each worker. */
-  public static final String MAX_PARTITIONS_IN_MEMORY =
-      "giraph.maxPartitionsInMemory";
-  /** Default maximum number of in-memory partitions. */
-  public static final int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
-
-  /** Keep the zookeeper output for debugging? Default is to remove it. */
-  public static final String KEEP_ZOOKEEPER_DATA =
-      "giraph.keepZooKeeperData";
-  /** Default is to remove ZooKeeper data. */
-  public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
-
-  /** Default ZooKeeper tick time. */
-  public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
-  /** Default ZooKeeper init limit (in ticks). */
-  public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
-  /** Default ZooKeeper sync limit (in ticks). */
-  public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
-  /** Default ZooKeeper snap count. */
-  public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
-  /** Default ZooKeeper maximum client connections. */
-  public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
-  /** ZooKeeper minimum session timeout */
-  public static final String ZOOKEEPER_MIN_SESSION_TIMEOUT =
-      "giraph.zookeeperMinSessionTimeout";
-  /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
-  public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000;
-  /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
-  public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
-
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-
-  /** Internal job that actually is submitted */
-  private final Job job;
+  /** Internal delegated job to proxy interface requests for Job */
+  private final DelegatedJob delegatedJob;
+  /** Name of the job */
+  private final String jobName;
   /** Helper configuration from the job */
-  private final Configuration conf;
+  private final GiraphConfiguration giraphConfiguration;
 
+  /**
+   * Delegated job that simply passes along the class GiraphConfiguration.
+   */
+  private class DelegatedJob extends Job {
+    /** Ensure that for job initiation the super.getConfiguration() is used */
+    private boolean jobInited = false;
+
+    /**
+     * Constructor
+     *
+     * @throws IOException
+     */
+    DelegatedJob() throws IOException { }
+
+    @Override
+    public Configuration getConfiguration() {
+      if (jobInited) {
+        return giraphConfiguration;
+      } else {
+        return super.getConfiguration();
+      }
+    }
+  }
 
   /**
    * Constructor that will instantiate the configuration
@@ -499,19 +79,33 @@ public class GiraphJob {
    * @throws IOException
    */
   public GiraphJob(String jobName) throws IOException {
-    this(new Configuration(), jobName);
+    this(new GiraphConfiguration(), jobName);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param configuration User-defined configuration
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(Configuration configuration,
+                   String jobName) throws IOException {
+    this(new GiraphConfiguration(configuration), jobName);
   }
 
   /**
    * Constructor.
    *
-   * @param conf User-defined configuration
+   * @param giraphConfiguration User-defined configuration
    * @param jobName User-defined job name
    * @throws IOException
    */
-  public GiraphJob(Configuration conf, String jobName) throws IOException {
-    job = new Job(conf, jobName);
-    this.conf = job.getConfiguration();
+  public GiraphJob(GiraphConfiguration giraphConfiguration,
+                   String jobName) throws IOException {
+    this.jobName = jobName;
+    this.giraphConfiguration = giraphConfiguration;
+    this.delegatedJob = new DelegatedJob();
   }
 
   /**
@@ -519,8 +113,8 @@ public class GiraphJob {
    *
    * @return Configuration used by the job.
    */
-  public Configuration getConfiguration() {
-    return conf;
+  public GiraphConfiguration getConfiguration() {
+    return giraphConfiguration;
   }
 
   /**
@@ -531,176 +125,47 @@ public class GiraphJob {
    * @return Internal job that will actually be submitted to Hadoop.
    */
   public Job getInternalJob() {
-    return job;
+    delegatedJob.jobInited = true;
+    return delegatedJob;
   }
   /**
    * Make sure the configuration is set properly by the user prior to
    * submitting the job.
+   *
+   * @param conf Configuration to check
    */
-  private void checkConfiguration() {
-    if (conf.getInt(MAX_WORKERS, -1) < 0) {
-      throw new RuntimeException("No valid " + MAX_WORKERS);
-    }
-    if (conf.getFloat(MIN_PERCENT_RESPONDED,
-        MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
-        conf.getFloat(MIN_PERCENT_RESPONDED,
-            MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
-      throw new IllegalArgumentException(
-          "Invalid " +
-              conf.getFloat(MIN_PERCENT_RESPONDED,
-                  MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
-                  MIN_PERCENT_RESPONDED);
-    }
-    if (conf.getInt(MIN_WORKERS, -1) < 0) {
-      throw new IllegalArgumentException("No valid " + MIN_WORKERS);
+  private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+    if (conf.getMaxWorkers() < 0) {
+      throw new RuntimeException("checkConfiguration: No valid " +
+          GiraphConfiguration.MAX_WORKERS);
     }
-    if (BspUtils.getVertexClass(getConfiguration()) == null) {
-      throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
-    }
-    if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
+    if (conf.getMinPercentResponded() <= 0.0f ||
+        conf.getMinPercentResponded() > 100.0f) {
       throw new IllegalArgumentException(
-          "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
+          "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
+              " for " + GiraphConfiguration.MIN_PERCENT_RESPONDED);
+    }
+    if (conf.getMinWorkers() < 0) {
+      throw new IllegalArgumentException("checkConfiguration: No valid " +
+          GiraphConfiguration.MIN_WORKERS);
+    }
+    if (conf.getVertexClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: Null" +
+          GiraphConfiguration.VERTEX_CLASS);
+    }
+    if (conf.getVertexInputFormatClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: Null " +
+          GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS);
     }
-    if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
-      setVertexResolverClass(VertexResolver.class);
+    if (conf.getVertexResolverClass() == null) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("GiraphJob: No class found for " +
-            VERTEX_RESOLVER_CLASS + ", defaulting to " +
+        LOG.info("checkConfiguration: No class found for " +
+            GiraphConfiguration.VERTEX_RESOLVER_CLASS + ", defaulting to " +
             VertexResolver.class.getCanonicalName());
       }
     }
   }
 
-  /**
-   * Set the vertex class (required)
-   *
-   * @param vertexClass Runs vertex computation
-   */
-  public final void setVertexClass(Class<?> vertexClass) {
-    getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
-  }
-
-  /**
-   * Set the vertex input format class (required)
-   *
-   * @param vertexInputFormatClass Determines how graph is input
-   */
-  public final void setVertexInputFormatClass(
-      Class<?> vertexInputFormatClass) {
-    getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
-        vertexInputFormatClass,
-        VertexInputFormat.class);
-  }
-
-  /**
-   * Set the master class (optional)
-   *
-   * @param masterComputeClass Runs master computation
-   */
-  public final void setMasterComputeClass(Class<?> masterComputeClass) {
-    getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
-        MasterCompute.class);
-  }
-
-  /**
-   * Set the vertex output format class (optional)
-   *
-   * @param vertexOutputFormatClass Determines how graph is output
-   */
-  public final void setVertexOutputFormatClass(
-      Class<?> vertexOutputFormatClass) {
-    getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
-        vertexOutputFormatClass,
-        VertexOutputFormat.class);
-  }
-
-  /**
-   * Set the vertex combiner class (optional)
-   *
-   * @param vertexCombinerClass Determines how vertex messages are combined
-   */
-  public final void setVertexCombinerClass(Class<?> vertexCombinerClass) {
-    getConfiguration().setClass(VERTEX_COMBINER_CLASS,
-        vertexCombinerClass,
-        VertexCombiner.class);
-  }
-
-  /**
-   * Set the graph partitioner class (optional)
-   *
-   * @param graphPartitionerFactoryClass Determines how the graph is partitioned
-   */
-  public final void setGraphPartitionerFactoryClass(
-      Class<?> graphPartitionerFactoryClass) {
-    getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
-        graphPartitionerFactoryClass,
-        GraphPartitionerFactory.class);
-  }
-
-  /**
-   * Set the vertex resolver class (optional)
-   *
-   * @param vertexResolverClass Determines how vertex mutations are resolved
-   */
-  public final void setVertexResolverClass(Class<?> vertexResolverClass) {
-    getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
-        vertexResolverClass,
-        VertexResolver.class);
-  }
-
-  /**
-   * Set the worker context class (optional)
-   *
-   * @param workerContextClass Determines what code is executed on a each
-   *        worker before and after each superstep and computation
-   */
-  public final void setWorkerContextClass(Class<?> workerContextClass) {
-    getConfiguration().setClass(WORKER_CONTEXT_CLASS,
-        workerContextClass,
-        WorkerContext.class);
-  }
-
-  /**
-   * Set the aggregator writer class (optional)
-   *
-   * @param aggregatorWriterClass Determines how the aggregators are
-   *        written to file at the end of the job
-   */
-  public final void setAggregatorWriterClass(
-      Class<?> aggregatorWriterClass) {
-    getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
-        aggregatorWriterClass,
-        AggregatorWriter.class);
-  }
-
-  /**
-   * Set worker configuration for determining what is required for
-   * a superstep.
-   *
-   * @param minWorkers Minimum workers to do a superstep
-   * @param maxWorkers Maximum workers to do a superstep
-   *        (max map tasks in job)
-   * @param minPercentResponded 0 - 100 % of the workers required to
-   *        have responded before continuing the superstep
-   */
-  public final void setWorkerConfiguration(int minWorkers,
-      int maxWorkers,
-      float minPercentResponded) {
-    conf.setInt(MIN_WORKERS, minWorkers);
-    conf.setInt(MAX_WORKERS, maxWorkers);
-    conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
-  }
-
-  /**
-   * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
-   * will be dynamically started by Giraph for this job.
-   *
-   * @param serverList Comma separated list of servers and ports
-   *        (i.e. zk1:2221,zk2:2221)
-   */
-  public final void setZooKeeperConfiguration(String serverList) {
-    conf.set(ZOOKEEPER_LIST, serverList);
-  }
 
   /**
    * Check if the configuration is local.  If it is local, do additional
@@ -709,22 +174,21 @@ public class GiraphJob {
    * @param conf Configuration
    */
   private static void checkLocalJobRunnerConfiguration(
-      Configuration conf) {
+      GiraphConfiguration conf) {
     String jobTracker = conf.get("mapred.job.tracker", null);
     if (!jobTracker.equals("local")) {
       // Nothing to check
       return;
     }
 
-    int maxWorkers = conf.getInt(MAX_WORKERS, -1);
+    int maxWorkers = conf.getMaxWorkers();
     if (maxWorkers != 1) {
       throw new IllegalArgumentException(
           "checkLocalJobRunnerConfiguration: When using " +
               "LocalJobRunner, must have only one worker since " +
           "only 1 task at a time!");
     }
-    if (conf.getBoolean(SPLIT_MASTER_WORKER,
-        SPLIT_MASTER_WORKER_DEFAULT)) {
+    if (conf.getSplitMasterWorker()) {
       throw new IllegalArgumentException(
           "checkLocalJobRunnerConfiguration: When using " +
               "LocalJobRunner, you cannot run in split master / worker " +
@@ -739,8 +203,9 @@ public class GiraphJob {
    * @param defaultValue Assign to value if not set
    */
   private void setIntConfIfDefault(String param, int defaultValue) {
-    if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
-      conf.setInt(param, defaultValue);
+    if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
+        Integer.MIN_VALUE) {
+      giraphConfiguration.setInt(param, defaultValue);
     }
   }
 
@@ -755,9 +220,6 @@ public class GiraphJob {
    */
   public final boolean run(boolean verbose)
     throws IOException, InterruptedException, ClassNotFoundException {
-    checkConfiguration();
-    checkLocalJobRunnerConfiguration(conf);
-    job.setNumReduceTasks(0);
     // Most users won't hit this hopefully and can set it higher if desired
     setIntConfIfDefault("mapreduce.job.counters.limit", 512);
 
@@ -767,22 +229,30 @@ public class GiraphJob {
     setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
 
     // Speculative execution doesn't make sense for Giraph
-    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+    giraphConfiguration.setBoolean(
+        "mapred.map.tasks.speculative.execution", false);
 
     // Set the ping interval to 5 minutes instead of one minute
     // (DEFAULT_PING_INTERVAL)
-    Client.setPingInterval(conf, 60000 * 5);
+    Client.setPingInterval(giraphConfiguration, 60000 * 5);
 
-    if (job.getJar() == null) {
-      job.setJarByClass(GiraphJob.class);
-    }
     // Should work in MAPREDUCE-1938 to let the user jars/classes
     // get loaded first
-    conf.setBoolean("mapreduce.user.classpath.first", true);
+    giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
 
-    job.setMapperClass(GraphMapper.class);
-    job.setInputFormatClass(BspInputFormat.class);
-    job.setOutputFormatClass(BspOutputFormat.class);
-    return job.waitForCompletion(verbose);
+    // Set the job properties, check them, and submit the job
+    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+    checkConfiguration(immutableClassesGiraphConfiguration);
+    checkLocalJobRunnerConfiguration(immutableClassesGiraphConfiguration);
+    Job submittedJob = new Job(immutableClassesGiraphConfiguration, jobName);
+    if (submittedJob.getJar() == null) {
+      submittedJob.setJarByClass(GiraphJob.class);
+    }
+    submittedJob.setNumReduceTasks(0);
+    submittedJob.setMapperClass(GraphMapper.class);
+    submittedJob.setInputFormatClass(BspInputFormat.class);
+    submittedJob.setOutputFormatClass(BspOutputFormat.class);
+    return submittedJob.waitForCompletion(verbose);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -75,7 +77,7 @@ public class GraphMapper<I extends Writa
   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
   private ZooKeeperManager zkManager;
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Already complete? */
   private boolean done = false;
   /** What kind of functions is this mapper doing? */
@@ -157,25 +159,25 @@ public class GraphMapper<I extends Writa
    */
   public void determineClassTypes(Configuration conf) {
     Class<? extends Vertex<I, V, E, M>> vertexClass =
-      BspUtils.<I, V, E, M>getVertexClass(conf);
+        BspUtils.<I, V, E, M>getVertexClass(conf);
     List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
-      Vertex.class, vertexClass);
+        Vertex.class, vertexClass);
     Type vertexIndexType = classList.get(0);
     Type vertexValueType = classList.get(1);
     Type edgeValueType = classList.get(2);
     Type messageValueType = classList.get(3);
-    conf.setClass(GiraphJob.VERTEX_ID_CLASS,
-      (Class<?>) vertexIndexType,
-      WritableComparable.class);
-    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
-      (Class<?>) vertexValueType,
-      Writable.class);
-    conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
-      (Class<?>) edgeValueType,
-      Writable.class);
-    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
-      (Class<?>) messageValueType,
-      Writable.class);
+    conf.setClass(GiraphConfiguration.VERTEX_ID_CLASS,
+        (Class<?>) vertexIndexType,
+        WritableComparable.class);
+    conf.setClass(GiraphConfiguration.VERTEX_VALUE_CLASS,
+        (Class<?>) vertexValueType,
+        Writable.class);
+    conf.setClass(GiraphConfiguration.EDGE_VALUE_CLASS,
+        (Class<?>) edgeValueType,
+        Writable.class);
+    conf.setClass(GiraphConfiguration.MESSAGE_VALUE_CLASS,
+        (Class<?>) messageValueType,
+        Writable.class);
   }
 
     /**
@@ -223,14 +225,11 @@ public class GraphMapper<I extends Writa
    * @return Functions that this mapper should do.
    */
   private static MapFunctions determineMapFunctions(
-      Configuration conf,
+      ImmutableClassesGiraphConfiguration conf,
       ZooKeeperManager zkManager) {
-    boolean splitMasterWorker =
-        conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
-            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
-    int taskPartition = conf.getInt("mapred.task.partition", -1);
-    boolean zkAlreadyProvided =
-        conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
+    boolean splitMasterWorker = conf.getSplitMasterWorker();
+    int taskPartition = conf.getTaskPartition();
+    boolean zkAlreadyProvided = conf.getZookeeperList() != null;
     MapFunctions functions = MapFunctions.UNKNOWN;
     // What functions should this mapper do?
     if (!splitMasterWorker) {
@@ -241,9 +240,7 @@ public class GraphMapper<I extends Writa
       }
     } else {
       if (zkAlreadyProvided) {
-        int masterCount =
-            conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-                GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+        int masterCount = conf.getZooKeeperServerCount();
         if (taskPartition < masterCount) {
           functions = MapFunctions.MASTER_ONLY;
         } else {
@@ -268,18 +265,17 @@ public class GraphMapper<I extends Writa
     // Setting the default handler for uncaught exceptions.
     Thread.setDefaultUncaughtExceptionHandler(
         new OverrideExceptionHandler());
-    conf = context.getConfiguration();
+    determineClassTypes(context.getConfiguration());
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
     // Hadoop security needs this property to be set
     if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
       conf.set("mapreduce.job.credentials.binary",
           System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
     }
-    // set pre-validated generic parameter types into Configuration
-    determineClassTypes(conf);
 
     // Set the log level
-    String logLevel =
-        conf.get(GiraphJob.LOG_LEVEL, GiraphJob.LOG_LEVEL_DEFAULT);
+    String logLevel = conf.getLocalLevel();
     Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
     if (LOG.isInfoEnabled()) {
       LOG.info("setup: Set log level to " + logLevel);
@@ -287,8 +283,7 @@ public class GraphMapper<I extends Writa
 
     // Do some initial setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
-    if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
-        GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+    if (!conf.getLocalTestMode()) {
       Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
       String zkClasspath = null;
       if (fileClassPaths == null) {
@@ -314,11 +309,11 @@ public class GraphMapper<I extends Writa
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: classpath @ " + zkClasspath);
       }
-      conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
+      context.getConfiguration().set(
+          GiraphConfiguration.ZOOKEEPER_JAR, zkClasspath);
     }
-    String serverPortList =
-        conf.get(GiraphJob.ZOOKEEPER_LIST, "");
-    if (serverPortList.isEmpty()) {
+    String serverPortList = conf.getZookeeperList();
+    if (serverPortList == null) {
       zkManager = new ZooKeeperManager(context);
       context.setStatus("setup: Setting up Zookeeper manager.");
       zkManager.setup();
@@ -334,14 +329,11 @@ public class GraphMapper<I extends Writa
     this.mapFunctions = determineMapFunctions(conf, zkManager);
 
     // Sometimes it takes a while to get multiple ZooKeeper servers up
-    if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-        GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
-      Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
-          GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
-    }
-    int sessionMsecTimeout =
-        conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
-            GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    if (conf.getZooKeeperServerCount() > 1) {
+      Thread.sleep(GiraphConfiguration.DEFAULT_ZOOKEEPER_INIT_LIMIT *
+          GiraphConfiguration.DEFAULT_ZOOKEEPER_TICK_TIME);
+    }
+    int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
     try {
       if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
           (mapFunctions == MapFunctions.MASTER_ONLY) ||
@@ -475,8 +467,7 @@ public class GraphMapper<I extends Writa
       serviceWorker.getWorkerContext().preSuperstep();
       context.progress();
 
-      boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY,
-          GiraphJob.USE_NETTY_DEFAULT);
+      boolean useNetty = conf.getUseNetty();
       MessageStoreByPartition<I, M> messageStore = null;
       if (useNetty) {
         messageStore = serviceWorker.getServerData().getCurrentMessageStore();

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Tue Sep 25 17:40:18 2012
@@ -146,18 +146,18 @@ public abstract class HashMapVertex<I ex
 
   @Override
   public final void readFields(DataInput in) throws IOException {
-    I vertexId = BspUtils.<I>createVertexId(getConf());
+    I vertexId = getConf().createVertexId();
     vertexId.readFields(in);
-    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    V vertexValue = getConf().createVertexValue();
     vertexValue.readFields(in);
     super.initialize(vertexId, vertexValue);
 
     int numEdges = in.readInt();
     edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
     for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      I targetVertexId = getConf().createVertexId();
       targetVertexId.readFields(in);
-      E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+      E edgeValue = getConf().createEdgeValue();
       edgeValue.readFields(in);
       edgeMap.put(targetVertexId, edgeValue);
     }
@@ -165,7 +165,7 @@ public abstract class HashMapVertex<I ex
     int numMessages = in.readInt();
     messageList = Lists.newArrayListWithCapacity(numMessages);
     for (int i = 0; i < numMessages; ++i) {
-      M message = BspUtils.<M>createMessageValue(getConf());
+      M message = getConf().createMessageValue();
       message.readFields(in);
       messageList.add(message);
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 
@@ -41,13 +41,13 @@ import org.apache.hadoop.mapreduce.Mappe
  */
 @SuppressWarnings("rawtypes")
 public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
-    Configurable {
+    ImmutableClassesGiraphConfigurable {
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
   /** Global graph state **/
   private GraphState graphState;
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Must be defined by user to specify what the master has to do.
@@ -164,12 +164,12 @@ public abstract class MasterCompute impl
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Tue Sep 25 17:40:18 2012
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
@@ -74,8 +75,8 @@ public class MasterThread<I extends Writ
     this.bspServiceMaster = bspServiceMaster;
     this.context = context;
     superstepCounterOn = context.getConfiguration().getBoolean(
-        GiraphJob.USE_SUPERSTEP_COUNTERS,
-        GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
+        GiraphConfiguration.USE_SUPERSTEP_COUNTERS,
+        GiraphConfiguration.USE_SUPERSTEP_COUNTERS_DEFAULT);
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Tue Sep 25 17:40:18 2012
@@ -68,8 +68,7 @@ public abstract class MutableVertex<I ex
   public Vertex<I, V, E, M> instantiateVertex(
       I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
     MutableVertex<I, V, E, M> mutableVertex =
-        (MutableVertex<I, V, E, M>) BspUtils
-        .<I, V, E, M>createVertex(getContext().getConfiguration());
+        (MutableVertex<I, V, E, M>) getConf().createVertex();
     mutableVertex.setGraphState(getGraphState());
     mutableVertex.initialize(vertexId, vertexValue, edges, messages);
     return mutableVertex;

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Tue Sep 25 17:40:18 2012
@@ -92,15 +92,15 @@ public abstract class SimpleMutableVerte
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    I vertexId = BspUtils.<I>createVertexId(getConf());
+    I vertexId = (I) getConf().createVertexId();
     vertexId.readFields(in);
-    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    V vertexValue = (V) getConf().createVertexValue();
     vertexValue.readFields(in);
 
     int numEdges = in.readInt();
     Map<I, NullWritable> edges = new HashMap<I, NullWritable>(numEdges);
     for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      I targetVertexId = (I) getConf().createVertexId();
       targetVertexId.readFields(in);
       edges.put(targetVertexId, NullWritable.get());
     }
@@ -108,7 +108,7 @@ public abstract class SimpleMutableVerte
     int numMessages = in.readInt();
     List<M> messages = new ArrayList<M>(numMessages);
     for (int i = 0; i < numMessages; ++i) {
-      M message = BspUtils.<M>createMessageValue(getConf());
+      M message = (M) getConf().createMessageValue();
       message.readFields(in);
       messages.add(message);
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java Tue Sep 25 17:40:18 2012
@@ -71,15 +71,15 @@ public abstract class SimpleVertex<I ext
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    I vertexId = BspUtils.<I>createVertexId(getConf());
+    I vertexId = getConf().createVertexId();
     vertexId.readFields(in);
-    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    V vertexValue = getConf().createVertexValue();
     vertexValue.readFields(in);
 
     int numEdges = in.readInt();
     Map<I, NullWritable> edges = new HashMap<I, NullWritable>(numEdges);
     for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      I targetVertexId = getConf().createVertexId();
       targetVertexId.readFields(in);
       edges.put(targetVertexId, NullWritable.get());
     }
@@ -87,7 +87,7 @@ public abstract class SimpleVertex<I ext
     int numMessages = in.readInt();
     List<M> messages = new ArrayList<M>(numMessages);
     for (int i = 0; i < numMessages; ++i) {
-      M message = BspUtils.<M>createMessageValue(getConf());
+      M message = getConf().createMessageValue();
       message.readFields(in);
       messages.add(message);
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -45,7 +45,8 @@ import java.util.Map;
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements WorkerAggregatorUsage, Writable, Configurable {
+    implements WorkerAggregatorUsage, Writable,
+    ImmutableClassesGiraphConfigurable<I, V, E, M> {
   /** Vertex id. */
   private I id;
   /** Vertex value. */
@@ -55,12 +56,14 @@ public abstract class Vertex<I extends W
   /** Global graph state **/
   private GraphState<I, V, E, M> graphState;
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
 
   /**
-   * This method must be called after instantiation of a vertex with BspUtils
-   * unless deserialization from readFields() is called.
+   * This method must be called after instantiation of a vertex
+   * with ImmutableClassesGiraphConfiguration
+   * unless deserialization from readFields() is
+   * called.
    *
    * @param id Will be the vertex id
    * @param value Will be the vertex value
@@ -328,17 +331,17 @@ public abstract class Vertex<I extends W
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    I vertexId = BspUtils.<I>createVertexId(getConf());
+    I vertexId = (I) getConf().createVertexId();
     vertexId.readFields(in);
-    V vertexValue = BspUtils.<V>createVertexValue(getConf());
+    V vertexValue = (V) getConf().createVertexValue();
     vertexValue.readFields(in);
 
     int numEdges = in.readInt();
     Map<I, E> edges = new HashMap<I, E>(numEdges);
     for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = BspUtils.<I>createVertexId(getConf());
+      I targetVertexId = (I) getConf().createVertexId();
       targetVertexId.readFields(in);
-      E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+      E edgeValue = (E) getConf().createEdgeValue();
       edgeValue.readFields(in);
       edges.put(targetVertexId, edgeValue);
     }
@@ -346,7 +349,7 @@ public abstract class Vertex<I extends W
     int numMessages = in.readInt();
     List<M> messages = new ArrayList<M>(numMessages);
     for (int i = 0; i < numMessages; ++i) {
-      M message = BspUtils.<M>createMessageValue(getConf());
+      M message = (M) getConf().createMessageValue();
       message.readFields(in);
       messages.add(message);
     }
@@ -375,12 +378,12 @@ public abstract class Vertex<I extends W
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
     this.conf = conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Tue Sep 25 17:40:18 2012
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.json.JSONException;
@@ -44,7 +44,7 @@ import org.json.JSONObject;
 public class VertexMutations<I extends WritableComparable,
     V extends Writable, E extends Writable,
     M extends Writable> implements VertexChanges<I, V, E, M>,
-    Writable, Configurable {
+    Writable, ImmutableClassesGiraphConfigurable {
   /** List of added vertices during the last superstep */
   private final List<Vertex<I, V, E, M>> addedVertexList =
       new ArrayList<Vertex<I, V, E, M>>();
@@ -55,7 +55,7 @@ public class VertexMutations<I extends W
   /** List of removed edges */
   private final List<I> removedEdgeList = new ArrayList<I>();
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
   /**
    * Copy the vertex mutations.
@@ -85,22 +85,22 @@ public class VertexMutations<I extends W
 
     int addedVertexListSize = input.readInt();
     for (int i = 0; i < addedVertexListSize; ++i) {
-      Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+      Vertex<I, V, E, M> vertex = conf.createVertex();
       vertex.readFields(input);
       addedVertexList.add(vertex);
     }
     removedVertexCount = input.readInt();
     int addedEdgeListSize = input.readInt();
     for (int i = 0; i < addedEdgeListSize; ++i) {
-      I destVertex = BspUtils.<I>createVertexId(conf);
+      I destVertex = conf.createVertexId();
       destVertex.readFields(input);
-      E edgeValue = BspUtils.<E>createEdgeValue(conf);
+      E edgeValue = conf.createEdgeValue();
       edgeValue.readFields(input);
       addedEdgeList.add(new Edge<I, E>(destVertex, edgeValue));
     }
     int removedEdgeListSize = input.readInt();
     for (int i = 0; i < removedEdgeListSize; ++i) {
-      I removedEdge = BspUtils.<I>createVertexId(conf);
+      I removedEdge = conf.createVertexId();
       removedEdge.readFields(input);
       removedEdgeList.add(removedEdge);
     }
@@ -201,12 +201,12 @@ public class VertexMutations<I extends W
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -38,11 +38,12 @@ import java.util.List;
 @SuppressWarnings("rawtypes")
 public class VertexResolver<I extends WritableComparable, V extends Writable,
     E extends Writable, M extends Writable>
-    implements BasicVertexResolver<I, V, E, M>, Configurable {
+    implements BasicVertexResolver<I, V, E, M>,
+    ImmutableClassesGiraphConfigurable<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(VertexResolver.class);
   /** Configuration */
-  private Configuration conf = null;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf = null;
   /** Stored graph state */
   private GraphState<I, V, E, M> graphState;
 
@@ -86,7 +87,7 @@ public class VertexResolver<I extends Wr
       if (vertex == null && hasMessages) {
         vertex = instantiateVertex();
         vertex.initialize(vertexId,
-            BspUtils.<V>createVertexValue(getConf()),
+            getConf().createVertexValue(),
             null,
             null);
       }
@@ -114,19 +115,18 @@ public class VertexResolver<I extends Wr
 
   @Override
   public Vertex<I, V, E, M> instantiateVertex() {
-    Vertex<I, V, E, M> vertex =
-        BspUtils.<I, V, E, M>createVertex(getConf());
+    Vertex<I, V, E, M> vertex = getConf().createVertex();
     vertex.setGraphState(graphState);
     return vertex;
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
     this.conf = conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java Tue Sep 25 17:40:18 2012
@@ -22,17 +22,18 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Basic partition owner, can be subclassed for more complicated partition
  * owner implementations.
  */
-public class BasicPartitionOwner implements PartitionOwner, Configurable {
+public class BasicPartitionOwner implements PartitionOwner,
+    ImmutableClassesGiraphConfigurable {
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration conf;
   /** Partition id */
   private int partitionId = -1;
   /** Owning worker information */
@@ -145,12 +146,12 @@ public class BasicPartitionOwner impleme
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -68,7 +67,7 @@ public class DiskBackedPartitionStore<I 
   /** Directory on the local file system for storing out-of-core partitions. */
   private final String basePath;
   /** Configuration. */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Slot for loading out-of-core partitions. */
   private Partition<I, V, E, M> loadedPartition;
   /** Locks for accessing and modifying partitions. */
@@ -80,15 +79,16 @@ public class DiskBackedPartitionStore<I 
    *
    * @param conf Configuration
    */
-  public DiskBackedPartitionStore(Configuration conf) {
+  public DiskBackedPartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
     this.conf = conf;
     // We must be able to hold at least one partition in memory
     maxInMemoryPartitions = Math.max(1,
-        conf.getInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY,
-            GiraphJob.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+        conf.getInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY,
+            GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
     basePath = conf.get("mapred.job.id", "Unknown Job") +
-        conf.get(GiraphJob.PARTITIONS_DIRECTORY,
-            GiraphJob.PARTITIONS_DIRECTORY_DEFAULT);
+        conf.get(GiraphConfiguration.PARTITIONS_DIRECTORY,
+            GiraphConfiguration.PARTITIONS_DIRECTORY_DEFAULT);
   }
 
   /**
@@ -162,7 +162,7 @@ public class DiskBackedPartitionStore<I 
         new BufferedInputStream(new FileInputStream(file)));
     int numVertices = onDiskPartitions.get(partitionId);
     for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(conf);
+      Vertex<I, V, E, M> vertex = conf.createVertex();
       vertex.readFields(inputStream);
       partition.putVertex(vertex);
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph.partition;
 
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -31,7 +32,8 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public interface GraphPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable, M extends Writable> extends
+    ImmutableClassesGiraphConfigurable {
   /**
    * Create the {@link MasterGraphPartitioner} used by the master.
    * Instantiated once by the master and reused.

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java Tue Sep 25 17:40:18 2012
@@ -23,8 +23,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -59,7 +59,7 @@ public class HashMasterPartitioner<I ext
    */
   private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
   /** Provided configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration conf;
   /** Specified partition count (overrides calculation) */
   private final int userPartitionCount;
   /** Partition count (calculated in createInitialPartitionOwners) */
@@ -72,7 +72,7 @@ public class HashMasterPartitioner<I ext
    *
    *@param conf Configuration used.
    */
-  public HashMasterPartitioner(Configuration conf) {
+  public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
         DEFAULT_USER_PARTITION_COUNT);

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,7 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -35,9 +34,9 @@ import org.apache.hadoop.io.WritableComp
 @SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+    implements GraphPartitionerFactory<I, V, E, M> {
   /** Saved configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration conf;
 
   @Override
   public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
@@ -50,12 +49,12 @@ public class HashPartitionerFactory<I ex
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,7 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -35,9 +34,9 @@ import org.apache.hadoop.io.WritableComp
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+    implements GraphPartitionerFactory<I, V, E, M> {
   /** Saved configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
   @Override
   public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
@@ -50,12 +49,12 @@ public class HashRangePartitionerFactory
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -48,7 +47,7 @@ public class Partition<I extends Writabl
     V extends Writable, E extends Writable, M extends Writable>
     implements Writable {
   /** Configuration from the worker */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Partition id */
   private final int id;
   /** Vertex map for this range (keyed by index) */
@@ -60,11 +59,12 @@ public class Partition<I extends Writabl
    * @param conf Configuration.
    * @param id Partition id.
    */
-  public Partition(Configuration conf, int id) {
+  public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+                   int id) {
     this.conf = conf;
     this.id = id;
-    if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
-        GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+    if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
@@ -153,8 +153,7 @@ public class Partition<I extends Writabl
   public void readFields(DataInput input) throws IOException {
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
-      Vertex<I, V, E, M> vertex =
-        BspUtils.<I, V, E, M>createVertex(conf);
+      Vertex<I, V, E, M> vertex = conf.createVertex();
       vertex.readFields(input);
       if (vertexMap.put(vertex.getId(),
           (Vertex<I, V, E, M>) vertex) != null) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Tue Sep 25 17:40:18 2012
@@ -22,7 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.giraph.graph.BspUtils;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -63,7 +62,7 @@ public class RangePartitionOwner<I exten
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
-    maxIndex = BspUtils.<I>createVertexId(getConf());
+    maxIndex = (I) getConf().createVertexId();
     maxIndex.readFields(input);
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Tue Sep 25 17:40:18 2012
@@ -22,9 +22,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -36,7 +35,7 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class RangeSplitHint<I extends WritableComparable>
-    implements Writable, Configurable {
+    implements Writable, ImmutableClassesGiraphConfigurable {
   /** Hinted split index */
   private I splitIndex;
   /** Number of vertices in this range before the split */
@@ -44,11 +43,11 @@ public class RangeSplitHint<I extends Wr
   /** Number of vertices in this range after the split */
   private long postSplitVertexCount;
   /** Configuration */
-  private Configuration conf;
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    splitIndex = BspUtils.<I>createVertexId(conf);
+    splitIndex = conf.createVertexId();
     splitIndex.readFields(input);
     preSplitVertexCount = input.readLong();
     postSplitVertexCount = input.readLong();
@@ -62,12 +61,12 @@ public class RangeSplitHint<I extends Wr
   }
 
   @Override
-  public Configuration getConf() {
+  public ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
 
   @Override
-  public void setConf(Configuration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.graph.partition;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -43,14 +43,15 @@ public class SimplePartitionStore<I exte
   private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
       Maps.newConcurrentMap();
   /** Configuration. */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
   /**
    * Constructor.
    *
    * @param conf Configuration
    */
-  public SimplePartitionStore(Configuration conf) {
+  public SimplePartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
     this.conf = conf;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.BasicPartitionOwner;
 import org.apache.giraph.graph.partition.HashMasterPartitioner;
@@ -29,7 +30,6 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.MasterGraphPartitioner;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -70,7 +70,7 @@ public class SuperstepHashPartitionerFac
      *
      * @param conf Configuration to be stored.
      */
-    public SuperstepMasterPartition(Configuration conf) {
+    public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) {
       super(conf);
     }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -66,14 +64,11 @@ public class JsonBase64VertexInputFormat
   protected class JsonBase64VertexReader extends
     TextVertexReaderFromEachLineProcessed<JSONObject> {
 
-    /** Cached configuration */
-    private Configuration conf;
 
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
       super.initialize(inputSplit, context);
-      conf = context.getConfiguration();
     }
 
     @Override
@@ -93,7 +88,7 @@ public class JsonBase64VertexInputFormat
             vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
         DataInput input = new DataInputStream(
             new ByteArrayInputStream(decodedWritable));
-        I vertexId = BspUtils.<I>createVertexId(conf);
+        I vertexId = getConf().createVertexId();
         vertexId.readFields(input);
         return vertexId;
       } catch (JSONException e) {
@@ -109,7 +104,7 @@ public class JsonBase64VertexInputFormat
             vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
         DataInputStream input = new DataInputStream(
             new ByteArrayInputStream(decodedWritable));
-        V vertexValue = BspUtils.<V>createVertexValue(conf);
+        V vertexValue = getConf().createVertexValue();
         vertexValue.readFields(input);
         return vertexValue;
       } catch (JSONException e) {
@@ -139,11 +134,9 @@ public class JsonBase64VertexInputFormat
         }
         DataInputStream input = new DataInputStream(
             new ByteArrayInputStream(decodedWritable));
-        I targetVertexId =
-            BspUtils.<I>createVertexId(getContext().getConfiguration());
+        I targetVertexId = getConf().createVertexId();
         targetVertexId.readFields(input);
-        E edgeValue =
-            BspUtils.<E>createEdgeValue(getContext().getConfiguration());
+        E edgeValue = getConf().createEdgeValue();
         edgeValue.readFields(input);
         edgeMap.put(targetVertexId, edgeValue);
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,12 +18,11 @@
 
 package org.apache.giraph.io;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -98,7 +97,7 @@ public class PseudoRandomVertexInputForm
     /** BspInputSplit (used only for index). */
     private BspInputSplit bspInputSplit;
     /** Saved configuration */
-    private Configuration configuration;
+    private ImmutableClassesGiraphConfiguration configuration;
 
     /**
      * Default constructor for reflection.
@@ -109,7 +108,8 @@ public class PseudoRandomVertexInputForm
     @Override
     public void initialize(InputSplit inputSplit,
         TaskAttemptContext context) throws IOException {
-      configuration = context.getConfiguration();
+      configuration = new ImmutableClassesGiraphConfiguration(
+          context.getConfiguration());
       aggregateVertices =
         configuration.getLong(
           PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
@@ -152,7 +152,7 @@ public class PseudoRandomVertexInputForm
     public Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
     getCurrentVertex() throws IOException, InterruptedException {
       Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
-      vertex = BspUtils.createVertex(configuration);
+      vertex = configuration.createVertex();
       long vertexId = startingVertexId + verticesRead;
       // Seed on the vertex id to keep the vertex data the same when
       // on different number of workers, but other parameters are the
@@ -174,9 +174,9 @@ public class PseudoRandomVertexInputForm
       ++verticesRead;
       if (LOG.isTraceEnabled()) {
         LOG.trace("next: Return vertexId=" +
-                  vertex.getId().get() +
-                  ", vertexValue=" + vertex.getValue() +
-                  ", edges=" + vertex.getEdges());
+            vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", edges=" + vertex.getEdges());
       }
       return vertex;
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
@@ -94,11 +94,15 @@ public abstract class TextVertexInputFor
     private RecordReader<LongWritable, Text> lineRecordReader;
     /** Context passed to initialize */
     private TaskAttemptContext context;
+    /** Cached configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
       this.context = context;
+      conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
       lineRecordReader = createLineRecordReader(inputSplit, context);
       lineRecordReader.initialize(inputSplit, context);
     }
@@ -151,6 +155,15 @@ public abstract class TextVertexInputFor
     protected TaskAttemptContext getContext() {
       return context;
     }
+
+    /**
+     * Get the configuration.
+     *
+     * @return Configuration for this reader
+     */
+    protected ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+      return conf;
+    }
   }
 
   /**
@@ -164,8 +177,7 @@ public abstract class TextVertexInputFor
     public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
     InterruptedException {
       Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex = BspUtils
-          .<I, V, E, M>createVertex(getContext().getConfiguration());
+      Vertex<I, V, E, M> vertex = getConf().createVertex();
       vertex.initialize(getId(line), getValue(line), getEdges(line), null);
       return vertex;
     }
@@ -234,8 +246,7 @@ public abstract class TextVertexInputFor
       Text line = getRecordReader().getCurrentValue();
       Vertex<I, V, E, M> vertex;
       T processed = preprocessLine(line);
-      vertex = BspUtils
-          .<I, V, E, M>createVertex(getContext().getConfiguration());
+      vertex = getConf().createVertex();
       vertex.initialize(getId(processed), getValue(processed),
           getEdges(processed), null);
       return vertex;
@@ -322,8 +333,7 @@ public abstract class TextVertexInputFor
       try {
         processed = preprocessLine(line);
         Configuration conf = getContext().getConfiguration();
-        vertex = BspUtils
-            .<I, V, E, M>createVertex(conf);
+        vertex = getConf().createVertex();
         vertex.initialize(getId(processed), getValue(processed),
             getEdges(processed), null);
       } catch (IOException e) {