You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/28 01:02:29 UTC

[2/3] GIRAPH-587: Refactor configuration options (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7882d06..c5b9b93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,72 +17,147 @@
  */
 package org.apache.giraph.conf;
 
+import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.partition.HashPartitionerFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
+import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 /**
  * Constants used all over Giraph for configuration.
  */
 // CHECKSTYLE: stop InterfaceIsTypeCheck
 public interface GiraphConstants {
+  /** 1KB in bytes */
+  int ONE_KB = 1024;
+
   /** Vertex class - required */
-  String VERTEX_CLASS = "giraph.vertexClass";
+  ClassConfOption<Vertex> VERTEX_CLASS =
+      ClassConfOption.create("giraph.vertexClass", null, Vertex.class);
   /** Vertex value factory class - optional */
-  String VERTEX_VALUE_FACTORY_CLASS = "giraph.vertexValueFactoryClass";
+  ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
+      ClassConfOption.create("giraph.vertexValueFactoryClass",
+          DefaultVertexValueFactory.class, VertexValueFactory.class);
   /** Vertex edges class - optional */
-  String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
+  ClassConfOption<VertexEdges> VERTEX_EDGES_CLASS =
+      ClassConfOption.create("giraph.vertexEdgesClass", ByteArrayEdges.class,
+          VertexEdges.class);
   /** Vertex edges class to be used during edge input only - optional */
-  String INPUT_VERTEX_EDGES_CLASS = "giraph.inputVertexEdgesClass";
+  ClassConfOption<VertexEdges> INPUT_VERTEX_EDGES_CLASS =
+      ClassConfOption.create("giraph.inputVertexEdgesClass",
+          ByteArrayEdges.class, VertexEdges.class);
 
   /** Class for Master - optional */
-  String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+  ClassConfOption<MasterCompute> MASTER_COMPUTE_CLASS =
+      ClassConfOption.create("giraph.masterComputeClass",
+          DefaultMasterCompute.class, MasterCompute.class);
   /** Classes for Master Observer - optional */
-  String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+  ClassConfOption<MasterObserver> MASTER_OBSERVER_CLASSES =
+      ClassConfOption.create("giraph.master.observers",
+          null, MasterObserver.class);
   /** Classes for Worker Observer - optional */
-  String WORKER_OBSERVER_CLASSES = "giraph.worker.observers";
+  ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
+      ClassConfOption.create("giraph.worker.observers", null,
+          WorkerObserver.class);
   /** Vertex combiner class - optional */
-  String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+  ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
+      ClassConfOption.create("giraph.combinerClass", null, Combiner.class);
   /** Vertex resolver class - optional */
-  String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+  ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
+      ClassConfOption.create("giraph.vertexResolverClass",
+          DefaultVertexResolver.class, VertexResolver.class);
+
   /**
    * Option of whether to create vertexes that were not existent before but
    * received messages
    */
-  String RESOLVER_CREATE_VERTEX_ON_MSGS =
-      "giraph.vertex.resolver.create.on.msgs";
+  BooleanConfOption RESOLVER_CREATE_VERTEX_ON_MSGS =
+      new BooleanConfOption("giraph.vertex.resolver.create.on.msgs", true);
   /** Graph partitioner factory class - optional */
-  String GRAPH_PARTITIONER_FACTORY_CLASS =
-      "giraph.graphPartitionerFactoryClass";
+  ClassConfOption<GraphPartitionerFactory> GRAPH_PARTITIONER_FACTORY_CLASS =
+      ClassConfOption.create("giraph.graphPartitionerFactoryClass",
+          HashPartitionerFactory.class, GraphPartitionerFactory.class);
 
   /** Observer class to watch over job status - optional */
-  String JOB_OBSERVER_CLASS = "giraph.jobObserverClass";
+  ClassConfOption<GiraphJobObserver> JOB_OBSERVER_CLASS =
+      ClassConfOption.create("giraph.jobObserverClass",
+          DefaultJobObserver.class, GiraphJobObserver.class);
 
   // At least one of the input format classes is required.
   /** VertexInputFormat class */
-  String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+  ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.vertexInputFormatClass", null,
+          VertexInputFormat.class);
   /** EdgeInputFormat class */
-  String EDGE_INPUT_FORMAT_CLASS = "giraph.edgeInputFormatClass";
+  ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.edgeInputFormatClass", null,
+          EdgeInputFormat.class);
 
   /** VertexOutputFormat class */
-  String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+  ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.vertexOutputFormatClass", null,
+          VertexOutputFormat.class);
 
   /** Output Format Path (for Giraph-on-YARN) */
   String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
 
   /** Vertex index class */
-  String VERTEX_ID_CLASS = "giraph.vertexIdClass";
+  ClassConfOption<WritableComparable> VERTEX_ID_CLASS =
+      ClassConfOption.create("giraph.vertexIdClass", null,
+          WritableComparable.class);
   /** Vertex value class */
-  String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+  ClassConfOption<Writable> VERTEX_VALUE_CLASS =
+      ClassConfOption.create("giraph.vertexValueClass", null, Writable.class);
   /** Edge value class */
-  String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+  ClassConfOption<Writable> EDGE_VALUE_CLASS =
+      ClassConfOption.create("giraph.edgeValueClass", null, Writable.class);
   /** Message value class */
-  String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+  ClassConfOption<Writable> MESSAGE_VALUE_CLASS =
+      ClassConfOption.create("giraph.messageValueClass", null, Writable.class);
   /** Partition context class */
-  String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass";
+  ClassConfOption<PartitionContext> PARTITION_CONTEXT_CLASS =
+      ClassConfOption.create("giraph.partitionContextClass",
+          DefaultPartitionContext.class, PartitionContext.class);
   /** Worker context class */
-  String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+  ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
+      ClassConfOption.create("giraph.workerContextClass",
+          DefaultWorkerContext.class, WorkerContext.class);
   /** AggregatorWriter class - optional */
-  String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass";
+  ClassConfOption<AggregatorWriter> AGGREGATOR_WRITER_CLASS =
+      ClassConfOption.create("giraph.aggregatorWriterClass",
+          TextAggregatorWriter.class, AggregatorWriter.class);
 
   /** Partition class - optional */
-  String PARTITION_CLASS = "giraph.partitionClass";
+  ClassConfOption<Partition> PARTITION_CLASS =
+      ClassConfOption.create("giraph.partitionClass", SimplePartition.class,
+          Partition.class);
 
   /**
    * Minimum number of simultaneous workers before this job can run (int)
@@ -97,54 +172,42 @@ public interface GiraphConstants {
    * Separate the workers and the master tasks.  This is required
    * to support dynamic recovery. (boolean)
    */
-  String SPLIT_MASTER_WORKER = "giraph.SplitMasterWorker";
-  /**
-   * Default on whether to separate the workers and the master tasks.
-   * Needs to be "true" to support dynamic recovery.
-   */
-  boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+  BooleanConfOption SPLIT_MASTER_WORKER =
+      new BooleanConfOption("giraph.SplitMasterWorker", true);
 
   /** Indicates whether this job is run in an internal unit test */
-  String LOCAL_TEST_MODE = "giraph.localTestMode";
-
-  /** not in local test mode per default */
-  boolean LOCAL_TEST_MODE_DEFAULT = false;
+  BooleanConfOption LOCAL_TEST_MODE =
+      new BooleanConfOption("giraph.localTestMode", false);
 
   /** Override the Hadoop log level and set the desired log level. */
-  String LOG_LEVEL = "giraph.logLevel";
-  /** Default log level is INFO (same as Hadoop) */
-  String LOG_LEVEL_DEFAULT = "info";
+  StrConfOption LOG_LEVEL = new StrConfOption("giraph.logLevel", "info");
 
   /** Use thread level debugging? */
-  String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
-  /** Default to not use thread-level debugging */
-  boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+  BooleanConfOption LOG_THREAD_LAYOUT =
+      new BooleanConfOption("giraph.logThreadLayout", false);
 
   /** Configuration key to enable jmap printing */
-  String JMAP_ENABLE = "giraph.jmap.histo.enable";
-  /** Default value for enabling jmap */
-  boolean JMAP_ENABLE_DEFAULT = false;
+  BooleanConfOption JMAP_ENABLE =
+      new BooleanConfOption("giraph.jmap.histo.enable", false);
 
   /** Configuration key for msec to sleep between calls */
-  String JMAP_SLEEP_MILLIS = "giraph.jmap.histo.msec";
-  /** Default msec to sleep between calls */
-  int JMAP_SLEEP_MILLIS_DEFAULT = 30000;
+  IntConfOption JMAP_SLEEP_MILLIS =
+      new IntConfOption("giraph.jmap.histo.msec", SECONDS.toMillis(30));
 
   /** Configuration key for how many lines to print */
-  String JMAP_PRINT_LINES = "giraph.jmap.histo.print_lines";
-  /** Default lines of output to print */
-  int JMAP_PRINT_LINES_DEFAULT = 30;
+  IntConfOption JMAP_PRINT_LINES =
+      new IntConfOption("giraph.jmap.histo.print_lines", 30);
 
   /**
    * Minimum percent of the maximum number of workers that have responded
    * in order to continue progressing. (float)
    */
-  String MIN_PERCENT_RESPONDED = "giraph.minPercentResponded";
-  /** Default 100% response rate for workers */
-  float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
+  FloatConfOption MIN_PERCENT_RESPONDED =
+      new FloatConfOption("giraph.minPercentResponded", 100.0f);
 
   /** Enable the Metrics system **/
-  String METRICS_ENABLE = "giraph.metrics.enable";
+  BooleanConfOption METRICS_ENABLE =
+      new BooleanConfOption("giraph.metrics.enable", false);
 
   /**
    *  ZooKeeper comma-separated list (if not set,
@@ -153,24 +216,20 @@ public interface GiraphConstants {
   String ZOOKEEPER_LIST = "giraph.zkList";
 
   /** ZooKeeper session millisecond timeout */
-  String ZOOKEEPER_SESSION_TIMEOUT = "giraph.zkSessionMsecTimeout";
-  /** Default Zookeeper session millisecond timeout */
-  int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
+  IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
+      new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1));
 
   /** Polling interval to check for the ZooKeeper server data */
-  String ZOOKEEPER_SERVERLIST_POLL_MSECS = "giraph.zkServerlistPollMsecs";
-  /** Default polling interval to check for the ZooKeeper server data */
-  int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = 3 * 1000;
+  IntConfOption ZOOKEEPER_SERVERLIST_POLL_MSECS =
+      new IntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3));
 
   /** Number of nodes (not tasks) to run Zookeeper on */
-  String ZOOKEEPER_SERVER_COUNT = "giraph.zkServerCount";
-  /** Default number of nodes to run Zookeeper on */
-  int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
+  IntConfOption ZOOKEEPER_SERVER_COUNT =
+      new IntConfOption("giraph.zkServerCount", 1);
 
   /** ZooKeeper port to use */
-  String ZOOKEEPER_SERVER_PORT = "giraph.zkServerPort";
-  /** Default ZooKeeper port to use */
-  int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
+  IntConfOption ZOOKEEPER_SERVER_PORT =
+      new IntConfOption("giraph.zkServerPort", 22181);
 
   /** Location of the ZooKeeper jar - Used internally, not meant for users */
   String ZOOKEEPER_JAR = "giraph.zkJar";
@@ -179,187 +238,130 @@ public interface GiraphConstants {
   String ZOOKEEPER_DIR = "giraph.zkDir";
 
   /** Max attempts for handling ZooKeeper connection loss */
-  String ZOOKEEPER_OPS_MAX_ATTEMPTS = "giraph.zkOpsMaxAttempts";
-  /** Default of 3 attempts for handling ZooKeeper connection loss */
-  int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+  IntConfOption ZOOKEEPER_OPS_MAX_ATTEMPTS =
+      new IntConfOption("giraph.zkOpsMaxAttempts", 3);
 
   /**
-   * Msecs to wait before retrying a failed ZooKeeper op due to connection
-   * loss.
+   * Msecs to wait before retrying a failed ZooKeeper op due to connection loss.
    */
-  String ZOOKEEPER_OPS_RETRY_WAIT_MSECS = "giraph.zkOpsRetryWaitMsecs";
-  /**
-   * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
-   * connection loss.
-   */
-  int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+  IntConfOption ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
+      new IntConfOption("giraph.zkOpsRetryWaitMsecs", SECONDS.toMillis(5));
 
   /** TCP backlog (defaults to number of workers) */
-  String TCP_BACKLOG = "giraph.tcpBacklog";
-  /**
-   * Default TCP backlog default if the number of workers is not specified
-   * (i.e unittests)
-   */
-  int TCP_BACKLOG_DEFAULT = 1;
+  IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1);
 
-  /** How big to make the default buffer? */
-  String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
-      "giraph.nettyRequestEncoderBufferSize";
-  /** Start with 32K */
-  int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
+  /** How big to make the encoder buffer? */
+  IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+      new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB);
 
   /** Whether or not netty request encoder should use direct byte buffers */
-  String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
-      "giraph.nettyRequestEncoderUseDirectBuffers";
-  /**
-   * By default don't use direct buffers,
-   * since jobs can take more than allowed heap memory in that case
-   */
-  boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false;
+  BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
+      new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers",
+                            false);
 
   /** Netty client threads */
-  String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
-  /** Default is 4 */
-  int NETTY_CLIENT_THREADS_DEFAULT = 4;
+  IntConfOption NETTY_CLIENT_THREADS =
+      new IntConfOption("giraph.nettyClientThreads", 4);
 
   /** Netty server threads */
-  String NETTY_SERVER_THREADS = "giraph.nettyServerThreads";
-  /** Default is 16 */
-  int NETTY_SERVER_THREADS_DEFAULT = 16;
+  IntConfOption NETTY_SERVER_THREADS =
+      new IntConfOption("giraph.nettyServerThreads", 16);
 
   /** Use the execution handler in netty on the client? */
-  String NETTY_CLIENT_USE_EXECUTION_HANDLER =
-      "giraph.nettyClientUseExecutionHandler";
-  /** Use the execution handler in netty on the client - default true */
-  boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT = true;
+  BooleanConfOption NETTY_CLIENT_USE_EXECUTION_HANDLER =
+      new BooleanConfOption("giraph.nettyClientUseExecutionHandler", true);
 
   /** Netty client execution threads (execution handler) */
-  String NETTY_CLIENT_EXECUTION_THREADS =
-      "giraph.nettyClientExecutionThreads";
-  /** Default Netty client execution threads (execution handler) of 8 */
-  int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+  IntConfOption NETTY_CLIENT_EXECUTION_THREADS =
+      new IntConfOption("giraph.nettyClientExecutionThreads", 8);
 
   /** Where to place the netty client execution handle? */
-  String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
-      "giraph.nettyClientExecutionAfterHandler";
-  /**
-   * Default is to use the netty client execution handle after the request
-   * encoder.
-   */
-  String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT = "requestEncoder";
+  StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+      new StrConfOption("giraph.nettyClientExecutionAfterHandler",
+          "requestEncoder");
 
   /** Use the execution handler in netty on the server? */
-  String NETTY_SERVER_USE_EXECUTION_HANDLER =
-      "giraph.nettyServerUseExecutionHandler";
-  /** Use the execution handler in netty on the server - default true */
-  boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT = true;
+  BooleanConfOption NETTY_SERVER_USE_EXECUTION_HANDLER =
+      new BooleanConfOption("giraph.nettyServerUseExecutionHandler", true);
 
   /** Netty server execution threads (execution handler) */
-  String NETTY_SERVER_EXECUTION_THREADS = "giraph.nettyServerExecutionThreads";
-  /** Default Netty server execution threads (execution handler) of 8 */
-  int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+  IntConfOption NETTY_SERVER_EXECUTION_THREADS =
+      new IntConfOption("giraph.nettyServerExecutionThreads", 8);
 
   /** Where to place the netty server execution handle? */
-  String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
-      "giraph.nettyServerExecutionAfterHandler";
-  /**
-   * Default is to use the netty server execution handle after the request
-   * frame decoder.
-   */
-  String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT = "requestFrameDecoder";
+  StrConfOption NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+      new StrConfOption("giraph.nettyServerExecutionAfterHandler",
+          "requestFrameDecoder");
 
   /** Netty simulate a first request closed */
-  String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
-      "giraph.nettySimulateFirstRequestClosed";
-  /** Default of not simulating failure for first request */
-  boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT = false;
+  BooleanConfOption NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+      new BooleanConfOption("giraph.nettySimulateFirstRequestClosed", false);
 
   /** Netty simulate a first response failed */
-  String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
-      "giraph.nettySimulateFirstResponseFailed";
-  /** Default of not simulating failure for first reponse */
-  boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT = false;
+  BooleanConfOption NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+      new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false);
 
   /** Max resolve address attempts */
-  String MAX_RESOLVE_ADDRESS_ATTEMPTS = "giraph.maxResolveAddressAttempts";
-  /** Default max resolve address attempts */
-  int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+  IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
+      new IntConfOption("giraph.maxResolveAddressAttempts", 5);
 
   /** Msecs to wait between waiting for all requests to finish */
-  String WAITING_REQUEST_MSECS = "giraph.waitingRequestMsecs";
-  /** Default msecs to wait between waiting for all requests to finish */
-  int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+  IntConfOption WAITING_REQUEST_MSECS =
+      new IntConfOption("giraph.waitingRequestMsecs", SECONDS.toMillis(15));
 
   /** Millseconds to wait for an event before continuing */
-  String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
-  /**
-   * Default milliseconds to wait for an event before continuing (30 seconds)
-   */
-  int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+  IntConfOption EVENT_WAIT_MSECS =
+      new IntConfOption("giraph.eventWaitMsecs", SECONDS.toMillis(30));
 
   /**
    * Maximum milliseconds to wait before giving up trying to get the minimum
    * number of workers before a superstep (int).
    */
-  String MAX_MASTER_SUPERSTEP_WAIT_MSECS = "giraph.maxMasterSuperstepWaitMsecs";
-  /**
-   * Default maximum milliseconds to wait before giving up trying to get
-   * the minimum number of workers before a superstep (10 minutes).
-   */
-  int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT = 10 * 60 * 1000;
+  IntConfOption MAX_MASTER_SUPERSTEP_WAIT_MSECS =
+      new IntConfOption("giraph.maxMasterSuperstepWaitMsecs",
+          MINUTES.toMillis(10));
 
   /** Milliseconds for a request to complete (or else resend) */
-  String MAX_REQUEST_MILLISECONDS = "giraph.maxRequestMilliseconds";
-  /** Maximum number of milliseconds for a request to complete (10 minutes) */
-  int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
+  IntConfOption MAX_REQUEST_MILLISECONDS =
+      new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10));
 
   /** Netty max connection failures */
-  String NETTY_MAX_CONNECTION_FAILURES = "giraph.nettyMaxConnectionFailures";
-  /** Default Netty max connection failures */
-  int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+  IntConfOption NETTY_MAX_CONNECTION_FAILURES =
+      new IntConfOption("giraph.nettyMaxConnectionFailures", 1000);
 
   /** Initial port to start using for the IPC communication */
-  String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
-  /** Default port to start using for the IPC communication */
-  int IPC_INITIAL_PORT_DEFAULT = 30000;
+  IntConfOption IPC_INITIAL_PORT =
+      new IntConfOption("giraph.ipcInitialPort", 30000);
 
   /** Maximum bind attempts for different IPC ports */
-  String MAX_IPC_PORT_BIND_ATTEMPTS = "giraph.maxIpcPortBindAttempts";
-  /** Default maximum bind attempts for different IPC ports */
-  int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+  IntConfOption MAX_IPC_PORT_BIND_ATTEMPTS =
+      new IntConfOption("giraph.maxIpcPortBindAttempts", 20);
   /**
    * Fail first IPC port binding attempt, simulate binding failure
    * on real grid testing
    */
-  String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
-      "giraph.failFirstIpcPortBindAttempt";
-  /** Default fail first IPC port binding attempt flag */
-  boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
+  BooleanConfOption FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+      new BooleanConfOption("giraph.failFirstIpcPortBindAttempt", false);
 
   /** Client send buffer size */
-  String CLIENT_SEND_BUFFER_SIZE = "giraph.clientSendBufferSize";
-  /** Default client send buffer size of 0.5 MB */
-  int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+  IntConfOption CLIENT_SEND_BUFFER_SIZE =
+      new IntConfOption("giraph.clientSendBufferSize", 512 * ONE_KB);
 
   /** Client receive buffer size */
-  String CLIENT_RECEIVE_BUFFER_SIZE = "giraph.clientReceiveBufferSize";
-  /** Default client receive buffer size of 32 k */
-  int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+  IntConfOption CLIENT_RECEIVE_BUFFER_SIZE =
+      new IntConfOption("giraph.clientReceiveBufferSize", 32 * ONE_KB);
 
   /** Server send buffer size */
-  String SERVER_SEND_BUFFER_SIZE = "giraph.serverSendBufferSize";
-  /** Default server send buffer size of 32 k */
-  int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+  IntConfOption SERVER_SEND_BUFFER_SIZE =
+      new IntConfOption("giraph.serverSendBufferSize", 32 * ONE_KB);
 
   /** Server receive buffer size */
-  String SERVER_RECEIVE_BUFFER_SIZE = "giraph.serverReceiveBufferSize";
-  /** Default server receive buffer size of 0.5 MB */
-  int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+  IntConfOption SERVER_RECEIVE_BUFFER_SIZE =
+      new IntConfOption("giraph.serverReceiveBufferSize", 512 * ONE_KB);
 
   /** Maximum size of messages (in bytes) per peer before flush */
-  String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
-  /** Default maximum size of messages per peer before flush of 0.5MB */
-  int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
+  IntConfOption MAX_MSG_REQUEST_SIZE =
+      new IntConfOption("giraph.msgRequestSize", 512 * ONE_KB);
 
   /**
    * How much bigger than the average per partition size to make initial per
@@ -368,34 +370,23 @@ public interface GiraphConstants {
    * and a worker has P partitions, than its initial partition buffer size
    * will be (M / P) * (1 + A).
    */
-  String ADDITIONAL_MSG_REQUEST_SIZE =
-      "giraph.additionalMsgRequestSize";
-  /**
-   * Default factor for how bigger should initial per partition buffers be
-   * of 20%.
-   */
-  float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+  FloatConfOption ADDITIONAL_MSG_REQUEST_SIZE =
+      new FloatConfOption("giraph.additionalMsgRequestSize", 0.2f);
 
   /** Maximum size of edges (in bytes) per peer before flush */
-  String MAX_EDGE_REQUEST_SIZE = "giraph.edgeRequestSize";
-  /** Default maximum size of edges per peer before flush of 0.5MB */
-  int MAX_EDGE_REQUEST_SIZE_DEFAULT = 512 * 1024;
+  IntConfOption MAX_EDGE_REQUEST_SIZE =
+      new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB);
 
   /**
    * Additional size (expressed as a ratio) of each per-partition buffer on
    * top of the average size.
    */
-  String ADDITIONAL_EDGE_REQUEST_SIZE =
-      "giraph.additionalEdgeRequestSize";
-  /**
-   * Default additional per-partition buffer size.
-   */
-  float ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT = 0.2f;
+  FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
+      new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f);
 
   /** Maximum number of mutations per partition before flush */
-  String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
-  /** Default maximum number of mutations per partition before flush */
-  int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
+  IntConfOption MAX_MUTATIONS_PER_REQUEST =
+      new IntConfOption("giraph.maxMutationsPerRequest", 100);
 
   /**
    * Whether we should reuse the same Edge object when adding edges from
@@ -403,59 +394,38 @@ public interface GiraphConstants {
    * This works with edge storage implementations that don't keep references
    * to the input Edge objects (e.g., ByteArrayVertex).
    */
-  String REUSE_INCOMING_EDGE_OBJECTS = "giraph.reuseIncomingEdgeObjects";
-  /**
-   * Default is to not reuse edge objects (since it's not compatible with
-   * all storage implementations).
-   */
-  boolean REUSE_INCOMING_EDGE_OBJECTS_DEFAULT = false;
+  BooleanConfOption REUSE_INCOMING_EDGE_OBJECTS =
+      new BooleanConfOption("giraph.reuseIncomingEdgeObjects", false);
 
   /**
    * Use message size encoding (typically better for complex objects,
    * not meant for primitive wrapped messages)
    */
-  String USE_MESSAGE_SIZE_ENCODING = "giraph.useMessageSizeEncoding";
-  /**
-   * By default, do not use message size encoding as it is experimental.
-   */
-  boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+  BooleanConfOption USE_MESSAGE_SIZE_ENCODING =
+      new BooleanConfOption("giraph.useMessageSizeEncoding", false);
 
   /** Number of channels used per server */
-  String CHANNELS_PER_SERVER = "giraph.channelsPerServer";
-  /** Default number of channels used per server of 1 */
-  int DEFAULT_CHANNELS_PER_SERVER = 1;
+  IntConfOption CHANNELS_PER_SERVER =
+      new IntConfOption("giraph.channelsPerServer", 1);
 
   /** Number of flush threads per peer */
   String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
 
   /** Number of threads for vertex computation */
-  String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
-  /** Default number of threads for vertex computation */
-  int NUM_COMPUTE_THREADS_DEFAULT = 1;
+  IntConfOption NUM_COMPUTE_THREADS =
+      new IntConfOption("giraph.numComputeThreads", 1);
 
   /** Number of threads for input splits loading */
-  String NUM_INPUT_SPLITS_THREADS = "giraph.numInputSplitsThreads";
-  /** Default number of threads for input splits loading */
-  int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+  IntConfOption NUM_INPUT_SPLITS_THREADS =
+      new IntConfOption("giraph.numInputSplitsThreads", 1);
 
   /** Minimum stragglers of the superstep before printing them out */
-  String PARTITION_LONG_TAIL_MIN_PRINT = "giraph.partitionLongTailMinPrint";
-  /** Only print stragglers with one as a default */
-  int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
+  IntConfOption PARTITION_LONG_TAIL_MIN_PRINT =
+      new IntConfOption("giraph.partitionLongTailMinPrint", 1);
 
   /** Use superstep counters? (boolean) */
-  String USE_SUPERSTEP_COUNTERS = "giraph.useSuperstepCounters";
-  /** Default is to use the superstep counters */
-  boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
-  /**
-   * Set the multiplicative factor of how many partitions to create from
-   * a single InputSplit based on the number of total InputSplits.  For
-   * example, if there are 10 total InputSplits and this is set to 0.5, then
-   * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
-   * minimum size is met).
-   */
-  String TOTAL_INPUT_SPLIT_MULTIPLIER = "giraph.totalInputSplitMultiplier";
+  BooleanConfOption USE_SUPERSTEP_COUNTERS =
+      new BooleanConfOption("giraph.useSuperstepCounters", true);
 
   /**
    * Input split sample percent - Used only for sampling and testing, rather
@@ -463,31 +433,24 @@ public interface GiraphConstants {
    * fraction of the actual input splits from your VertexInputFormat to
    * load (values should be [0, 100]).
    */
-  String INPUT_SPLIT_SAMPLE_PERCENT = "giraph.inputSplitSamplePercent";
-  /** Default is to use all the input splits */
-  float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
+  FloatConfOption INPUT_SPLIT_SAMPLE_PERCENT =
+      new FloatConfOption("giraph.inputSplitSamplePercent", 100f);
 
   /**
    * To limit outlier vertex input splits from producing too many vertices or
    * to help with testing, the number of vertices loaded from an input split
    * can be limited.  By default, everything is loaded.
    */
-  String INPUT_SPLIT_MAX_VERTICES = "giraph.InputSplitMaxVertices";
-  /**
-   * Default is that all the vertices are to be loaded from the input split
-   */
-  long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+  LongConfOption INPUT_SPLIT_MAX_VERTICES =
+      new LongConfOption("giraph.InputSplitMaxVertices", -1);
 
   /**
    * To limit outlier vertex input splits from producing too many vertices or
    * to help with testing, the number of edges loaded from an input split
    * can be limited.  By default, everything is loaded.
    */
-  String INPUT_SPLIT_MAX_EDGES = "giraph.InputSplitMaxEdges";
-  /**
-   * Default is that all the edges are to be loaded from the input split
-   */
-  long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+  LongConfOption INPUT_SPLIT_MAX_EDGES =
+      new LongConfOption("giraph.InputSplitMaxEdges", -1);
 
   /**
    * To minimize network usage when reading input splits,
@@ -496,24 +459,16 @@ public interface GiraphConstants {
    * Hence, users with a lot of splits and input threads (or with
    * configurations that can't exploit locality) may want to disable it.
    */
-  String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality";
-
-  /**
-   * Default is to prioritize local input splits.
-   */
-  boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+  BooleanConfOption USE_INPUT_SPLIT_LOCALITY =
+      new BooleanConfOption("giraph.useInputSplitLocality", true);
 
   /** Multiplier for the current workers squared */
-  String PARTITION_COUNT_MULTIPLIER =
-      "partition.masterPartitionCountMultipler";
-  /** Default mulitplier for current workers squared */
-  float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+  FloatConfOption PARTITION_COUNT_MULTIPLIER =
+      new FloatConfOption("partition.masterPartitionCountMultipler", 1.0f);
 
   /** Overrides default partition count calculation if not -1 */
-  String USER_PARTITION_COUNT =
-      "partition.userPartitionCount";
-  /** Default user partition count */
-  int DEFAULT_USER_PARTITION_COUNT = -1;
+  IntConfOption USER_PARTITION_COUNT =
+      new IntConfOption("partition.userPartitionCount", -1);
 
   /** Vertex key space size for
    * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
@@ -521,28 +476,23 @@ public interface GiraphConstants {
   String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize";
 
   /** Java opts passed to ZooKeeper startup */
-  String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
-  /** Default java opts passed to ZooKeeper startup */
-  String ZOOKEEPER_JAVA_OPTS_DEFAULT =
-      "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
-          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
+  StrConfOption ZOOKEEPER_JAVA_OPTS =
+      new StrConfOption("giraph.zkJavaOpts",
+          "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100");
 
   /**
    *  How often to checkpoint (i.e. 0, means no checkpoint,
    *  1 means every superstep, 2 is every two supersteps, etc.).
    */
-  String CHECKPOINT_FREQUENCY = "giraph.checkpointFrequency";
-
-  /** Default checkpointing frequency of none. */
-  int CHECKPOINT_FREQUENCY_DEFAULT = 0;
+  IntConfOption CHECKPOINT_FREQUENCY =
+      new IntConfOption("giraph.checkpointFrequency", 0);
 
   /**
    * Delete checkpoints after a successful job run?
    */
-  String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
-      "giraph.cleanupCheckpointsAfterSuccess";
-  /** Default is to clean up the checkponts after a successful job */
-  boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = true;
+  BooleanConfOption CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
+      new BooleanConfOption("giraph.cleanupCheckpointsAfterSuccess", true);
 
   /**
    * An application can be restarted manually by selecting a superstep.  The
@@ -561,76 +511,56 @@ public interface GiraphConstants {
    * If ZOOKEEPER_LIST is not set, then use this directory to manage
    * ZooKeeper
    */
-  String ZOOKEEPER_MANAGER_DIRECTORY = "giraph.zkManagerDirectory";
-  /**
-   * Default ZooKeeper manager directory (where determining the servers in
-   * HDFS files will go).  directory path will also have job number
-   * for uniqueness.
-   */
-  String ZOOKEEPER_MANAGER_DIR_DEFAULT = "_bsp/_defaultZkManagerDir";
+  StrConfOption ZOOKEEPER_MANAGER_DIRECTORY =
+      new StrConfOption("giraph.zkManagerDirectory",
+          "_bsp/_defaultZkManagerDir");
 
   /** Number of ZooKeeper client connection attempts before giving up. */
-  String ZOOKEEPER_CONNECTION_ATTEMPTS = "giraph.zkConnectionAttempts";
-  /** Default of 10 ZooKeeper client connection attempts before giving up. */
-  int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+  IntConfOption ZOOKEEPER_CONNECTION_ATTEMPTS =
+      new IntConfOption("giraph.zkConnectionAttempts", 10);
 
   /** This directory has/stores the available checkpoint files in HDFS. */
-  String CHECKPOINT_DIRECTORY = "giraph.checkpointDirectory";
-  /**
-   * Default checkpoint directory (where checkpoing files go in HDFS).  Final
-   * directory path will also have the job number for uniqueness
-   */
-  String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/";
+  StrConfOption CHECKPOINT_DIRECTORY =
+      new StrConfOption("giraph.checkpointDirectory", "_bsp/_checkpoints/");
 
   /**
    * Comma-separated list of directories in the local file system for
    * out-of-core messages.
    */
-  String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
-  /**
-   * Default messages directory. directory path will also have the
-   * job number for uniqueness
-   */
-  String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
+  StrConfOption MESSAGES_DIRECTORY =
+      new StrConfOption("giraph.messagesDirectory", "_bsp/_messages/");
 
   /** Whether or not to use out-of-core messages */
-  String USE_OUT_OF_CORE_MESSAGES = "giraph.useOutOfCoreMessages";
-  /** Default choice about using out-of-core messaging */
-  boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
+  BooleanConfOption USE_OUT_OF_CORE_MESSAGES =
+      new BooleanConfOption("giraph.useOutOfCoreMessages", false);
   /**
    * If using out-of-core messaging, it tells how much messages do we keep
    * in memory.
    */
-  String MAX_MESSAGES_IN_MEMORY = "giraph.maxMessagesInMemory";
-  /** Default maximum number of messages in memory. */
-  int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
+  IntConfOption MAX_MESSAGES_IN_MEMORY =
+      new IntConfOption("giraph.maxMessagesInMemory", 1000000);
   /** Size of buffer when reading and writing messages out-of-core. */
-  String MESSAGES_BUFFER_SIZE = "giraph.messagesBufferSize";
-  /** Default size of buffer when reading and writing messages out-of-core. */
-  int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+  IntConfOption MESSAGES_BUFFER_SIZE =
+      new IntConfOption("giraph.messagesBufferSize", 8 * ONE_KB);
 
   /**
    * Comma-separated list of directories in the local filesystem for
    * out-of-core partitions.
    */
-  String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
-  /** Default directory for out-of-core partitions. */
-  String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+  StrConfOption PARTITIONS_DIRECTORY =
+      new StrConfOption("giraph.partitionsDirectory", "_bsp/_partitions");
 
   /** Enable out-of-core graph. */
-  String USE_OUT_OF_CORE_GRAPH = "giraph.useOutOfCoreGraph";
-  /** Default is not to use out-of-core graph. */
-  boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+  BooleanConfOption USE_OUT_OF_CORE_GRAPH =
+      new BooleanConfOption("giraph.useOutOfCoreGraph", false);
 
   /** Maximum number of partitions to hold in memory for each worker. */
-  String MAX_PARTITIONS_IN_MEMORY = "giraph.maxPartitionsInMemory";
-  /** Default maximum number of in-memory partitions. */
-  int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+  IntConfOption MAX_PARTITIONS_IN_MEMORY =
+      new IntConfOption("giraph.maxPartitionsInMemory", 10);
 
   /** Keep the zookeeper output for debugging? Default is to remove it. */
-  String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData";
-  /** Default is to remove ZooKeeper data. */
-  Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
+  BooleanConfOption KEEP_ZOOKEEPER_DATA =
+      new BooleanConfOption("giraph.keepZooKeeperData", false);
 
   /** Default ZooKeeper tick time. */
   int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
@@ -643,57 +573,50 @@ public interface GiraphConstants {
   /** Default ZooKeeper maximum client connections. */
   int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
   /** ZooKeeper minimum session timeout */
-  String ZOOKEEPER_MIN_SESSION_TIMEOUT = "giraph.zKMinSessionTimeout";
-  /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
-  int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+  IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT =
+      new IntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10));
   /** ZooKeeper maximum session timeout */
-  String ZOOKEEPER_MAX_SESSION_TIMEOUT = "giraph.zkMaxSessionTimeout";
-  /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
-  int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+  IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT =
+      new IntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15));
   /** ZooKeeper force sync */
-  String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
-  /** Default ZooKeeper force sync is off (for performance) */
-  String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+  StrConfOption ZOOKEEPER_FORCE_SYNC =
+      new StrConfOption("giraph.zKForceSync", "no");
   /** ZooKeeper skip ACLs */
-  String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
-  /** Default ZooKeeper skip ACLs true (for performance) */
-  String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
+  StrConfOption ZOOKEEPER_SKIP_ACL =
+      new StrConfOption("giraph.ZkSkipAcl", "yes");
 
   /**
    * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
    * and authorize Netty BSP Clients to Servers.
    */
-  String AUTHENTICATE = "giraph.authenticate";
-  /** Default is not to do authenticate and authorization with Netty. */
-  boolean DEFAULT_AUTHENTICATE = false;
+  BooleanConfOption AUTHENTICATE =
+      new BooleanConfOption("giraph.authenticate", false);
 
   /** Use unsafe serialization? */
-  String USE_UNSAFE_SERIALIZATION = "giraph.useUnsafeSerialization";
-  /**
-   * Use unsafe serialization default is true (use it if you can,
-   * its much faster)!
-   */
-  boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+  BooleanConfOption USE_UNSAFE_SERIALIZATION =
+      new BooleanConfOption("giraph.useUnsafeSerialization", true);
 
   /**
    * Maximum number of attempts a master/worker will retry before killing
    * the job.  This directly maps to the number of map task attempts in
    * Hadoop.
    */
-  String MAX_TASK_ATTEMPTS = "mapred.map.max.attempts";
+  IntConfOption MAX_TASK_ATTEMPTS =
+      new IntConfOption("mapred.map.max.attempts", -1);
 
   /** Interface to use for hostname resolution */
-  String DNS_INTERFACE = "giraph.dns.interface";
+  StrConfOption DNS_INTERFACE =
+      new StrConfOption("giraph.dns.interface", "default");
   /** Server for hostname resolution */
-  String DNS_NAMESERVER = "giraph.dns.nameserver";
+  StrConfOption DNS_NAMESERVER =
+      new StrConfOption("giraph.dns.nameserver", "default");
 
   /**
    * The application will halt after this many supersteps is completed.  For
    * instance, if it is set to 3, the application will run at most 0, 1,
    * and 2 supersteps and then go into the shutdown superstep.
    */
-  String MAX_NUMBER_OF_SUPERSTEPS = "giraph.maxNumberOfSupersteps";
-  /** By default, the number of supersteps is not limited */
-  int MAX_NUMBER_OF_SUPERSTEPS_DEFAULT = -1;
+  IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
+      new IntConfOption("giraph.maxNumberOfSupersteps", 1);
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index de85ab6..0af8b97 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
+import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
+
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
  * Classes are immutable and final to provide the best performance for
@@ -87,8 +89,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   public ImmutableClassesGiraphConfiguration(Configuration conf) {
     super(conf);
     classes = new GiraphClasses(conf);
-    useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
-        USE_UNSAFE_SERIALIZATION_DEFAULT);
+    useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
     try {
       vertexValueFactory = (VertexValueFactory<V>)
           classes.getVertexValueFactoryClass().newInstance();

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
new file mode 100644
index 0000000..de75e9d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Integer configuration option
+ */
+public class IntConfOption extends AbstractConfOption {
+  /** Default value */
+  private final int defaultValue;
+
+  /**
+   * Constructor
+   * @param key key
+   * @param defaultValue default value
+   */
+  public IntConfOption(String key, int defaultValue) {
+    super(key);
+    this.defaultValue = defaultValue;
+    AllOptions.add(this);
+  }
+
+  /**
+   * Constructor
+   * @param key key
+   * @param defaultValue default value
+   */
+  public IntConfOption(String key, long defaultValue) {
+    super(key);
+    this.defaultValue = (int) defaultValue;
+    AllOptions.add(this);
+  }
+
+  public int getDefaultValue() {
+    return defaultValue;
+  }
+
+  @Override public String getDefaultValueStr() {
+    return Integer.toString(defaultValue);
+  }
+
+  @Override public ConfOptionType getType() {
+    return ConfOptionType.INTEGER;
+  }
+
+  /**
+   * Lookup value
+   * @param conf Configuration
+   * @return value for key, or default value if not set
+   */
+  public int get(Configuration conf) {
+    return conf.getInt(getKey(), defaultValue);
+  }
+
+  /**
+   * Set value
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void set(Configuration conf, int value) {
+    conf.setInt(getKey(), value);
+  }
+
+  /**
+   * Set value if it's not already present
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void setIfUnset(Configuration conf, int value) {
+    if (conf.get(getKey()) == null) {
+      conf.setInt(getKey(), value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
new file mode 100644
index 0000000..0cbc164
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Long configuration option
+ */
+public class LongConfOption extends AbstractConfOption {
+  /** Default value */
+  private long defaultValue;
+
+  /**
+   * Constructor
+   * @param key key
+   * @param defaultValue default value
+   */
+  public LongConfOption(String key, long defaultValue) {
+    super(key);
+    this.defaultValue = defaultValue;
+    AllOptions.add(this);
+  }
+
+  public long getDefaultValue() {
+    return defaultValue;
+  }
+
+  @Override public String getDefaultValueStr() {
+    return Long.toString(defaultValue);
+  }
+
+  @Override public ConfOptionType getType() {
+    return ConfOptionType.LONG;
+  }
+
+  /**
+   * Lookup value
+   * @param conf Configuration
+   * @return value set for key, or defaultValue
+   */
+  public long get(Configuration conf) {
+    return conf.getLong(getKey(), defaultValue);
+  }
+
+  /**
+   * Set value for key
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void set(Configuration conf, long value) {
+    conf.setLong(getKey(), value);
+  }
+
+  /**
+   * Set value if it's not already present
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void setIfUnset(Configuration conf, long value) {
+    if (conf.get(getKey()) == null) {
+      conf.setLong(getKey(), value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
new file mode 100644
index 0000000..83a583d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * String Configuration option
+ */
+public class StrConfOption extends AbstractConfOption {
+  /** Default value */
+  private final String defaultValue;
+
+  /**
+   * Constructor
+   * @param key key
+   * @param defaultValue default value
+   */
+  public StrConfOption(String key, String defaultValue) {
+    super(key);
+    this.defaultValue = defaultValue;
+    AllOptions.add(this);
+  }
+
+  public String getDefaultValue() {
+    return defaultValue;
+  }
+
+  @Override public String getDefaultValueStr() {
+    return defaultValue;
+  }
+
+  @Override public ConfOptionType getType() {
+    return ConfOptionType.STRING;
+  }
+
+  /**
+   * Lookup value
+   * @param conf Configuration
+   * @return value for key, or defaultValue
+   */
+  public String get(Configuration conf) {
+    return conf.get(getKey(), defaultValue);
+  }
+
+  /**
+   * Lookup value with user defined defaultValue
+   * @param conf Configuration
+   * @param defaultVal default value to use
+   * @return value for key, or defaultVal passed in
+   */
+  public String getWithDefault(Configuration conf, String defaultVal) {
+    return conf.get(getKey(), defaultVal);
+  }
+
+  /**
+   * Get array of values for key
+   * @param conf Configuration
+   * @return array of values for key
+   */
+  public String[] getArray(Configuration conf) {
+    return conf.getStrings(getKey(), defaultValue);
+  }
+
+  /**
+   * Get list of values for key
+   * @param conf Configuration
+   * @return list of values for key
+   */
+  public List<String> getList(Configuration conf) {
+    return Lists.newArrayList(getArray(conf));
+  }
+
+  /**
+   * Set value for key
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void set(Configuration conf, String value) {
+    conf.set(getKey(), value);
+  }
+
+  /**
+   * Set value if not already present
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void setIfUnset(Configuration conf, String value) {
+    conf.setIfUnset(getKey(), value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e74c59a..57f7dff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -57,6 +55,9 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
@@ -72,6 +73,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+
 /**
  * The Giraph-specific business logic for a single BSP
  * compute node in whatever underlying type of cluster
@@ -440,18 +446,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     Type vertexValueType = classList.get(1);
     Type edgeValueType = classList.get(2);
     Type messageValueType = classList.get(3);
-    conf.setClass(GiraphConstants.VERTEX_ID_CLASS,
-        (Class<?>) vertexIndexType,
-        WritableComparable.class);
-    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS,
-        (Class<?>) vertexValueType,
-        Writable.class);
-    conf.setClass(GiraphConstants.EDGE_VALUE_CLASS,
-        (Class<?>) edgeValueType,
-        Writable.class);
-    conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS,
-        (Class<?>) messageValueType,
-        Writable.class);
+    VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
+    VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
+    EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
+    MESSAGE_VALUE_CLASS.set(conf, (Class<Writable>) messageValueType);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 1e05773..f55cf18 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -21,7 +21,6 @@ package org.apache.giraph.job;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
@@ -40,6 +39,9 @@ import org.apache.log4j.Logger;
 import java.lang.reflect.Type;
 import java.util.List;
 
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+
 /**
  * GiraphConfigurationValidator attempts to verify the consistency of
  * user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -138,34 +140,36 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
         conf.getMinPercentResponded() > 100.0f) {
       throw new IllegalArgumentException(
           "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
-              " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
+              " for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
     }
     if (conf.getMinWorkers() < 0) {
       throw new IllegalArgumentException("checkConfiguration: No valid " +
           GiraphConstants.MIN_WORKERS);
     }
     if (conf.getVertexClass() == null) {
-      throw new IllegalArgumentException("checkConfiguration: Null" +
-          GiraphConstants.VERTEX_CLASS);
+      throw new IllegalArgumentException("checkConfiguration: Null " +
+          GiraphConstants.VERTEX_CLASS.getKey());
     }
     if (conf.getVertexInputFormatClass() == null &&
         conf.getEdgeInputFormatClass() == null) {
       throw new IllegalArgumentException("checkConfiguration: One of " +
-          GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
-          GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+          GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
+          GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
+          " must be non-null");
     }
     if (conf.getVertexResolverClass() == null) {
       if (LOG.isInfoEnabled()) {
         LOG.info("checkConfiguration: No class found for " +
-            GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
-            DefaultVertexResolver.class.getCanonicalName());
+            VERTEX_RESOLVER_CLASS.getKey() +
+            ", defaulting to " +
+            VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
       }
     }
     if (conf.getVertexEdgesClass() == null) {
       if (LOG.isInfoEnabled()) {
         LOG.info("checkConfiguration: No class found for " +
-            GiraphConstants.VERTEX_EDGES_CLASS + ", defaulting to " +
-            ByteArrayEdges.class.getCanonicalName());
+            VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
+            VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index da85d1c..6849ca3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -214,7 +214,7 @@ public class GiraphJob {
       if (LOG.isInfoEnabled()) {
         LOG.info("run: Since checkpointing is disabled (default), " +
             "do not allow any task retries (setting " +
-            GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
+            GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 0, " +
             "old value = " + oldMaxTaskAttempts + ")");
       }
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 6c979d6..404e47e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -104,6 +104,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
+
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
  *
@@ -201,13 +206,10 @@ public class BspServiceMaster<I extends WritableComparable,
     maxWorkers = conf.getMaxWorkers();
     minWorkers = conf.getMinWorkers();
     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
-    minPercentResponded = conf.getFloat(
-        GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
+    minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
     eventWaitMsecs = conf.getEventWaitMsecs();
     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
-    partitionLongTailMinPrint = conf.getInt(
-        GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT,
-        GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+    partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
     masterGraphPartitioner =
         getGraphPartitionerFactory().createMasterGraphPartitioner();
     if (conf.isJMapHistogramDumpEnabled()) {
@@ -316,11 +318,8 @@ public class BspServiceMaster<I extends WritableComparable,
           logPrefix + ": Got InterruptedException", e);
     }
     float samplePercent =
-        getConfiguration().getFloat(
-            GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT,
-            GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
-    if (samplePercent !=
-        GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+        INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
+    if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
       int lastIndex = (int) (samplePercent * splits.size() / 100f);
       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
       LOG.warn(logPrefix + ": Using sampling - Processing only " +
@@ -579,6 +578,7 @@ public class BspServiceMaster<I extends WritableComparable,
   private int createInputSplits(GiraphInputFormat inputFormat,
                                 InputSplitPaths inputSplitPaths,
                                 String inputSplitType) {
+    ImmutableClassesGiraphConfiguration conf = getConfiguration();
     String logPrefix = "create" + inputSplitType + "InputSplits";
     // Only the 'master' should be doing this.  Wait until the number of
     // processes that have reported health exceeds the minimum percentage.
@@ -612,7 +612,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Create at least as many splits as the total number of input threads.
     int minSplitCountHint = healthyWorkerInfoList.size() *
-        getConfiguration().getNumInputSplitsThreads();
+        conf.getNumInputSplitsThreads();
 
     // Note that the input splits may only be a sample if
     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
@@ -635,8 +635,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
 
     // Write input splits to zookeeper in parallel
-    int inputSplitThreadCount = getConfiguration().getInt(
-        INPUT_SPLIT_THREAD_COUNT,
+    int inputSplitThreadCount = conf.getInt(INPUT_SPLIT_THREAD_COUNT,
         DEFAULT_INPUT_SPLIT_THREAD_COUNT);
     if (LOG.isInfoEnabled()) {
       LOG.info(logPrefix + ": Starting to write input split data " +
@@ -644,9 +643,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     ExecutorService taskExecutor =
         Executors.newFixedThreadPool(inputSplitThreadCount);
-    boolean writeLocations = getConfiguration().getBoolean(
-        GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
-        GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+    boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
       taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
@@ -1356,9 +1353,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private void cleanUpOldSuperstep(long removeableSuperstep) throws
       InterruptedException {
-    if (!(getConfiguration().getBoolean(
-        GiraphConstants.KEEP_ZOOKEEPER_DATA,
-        GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+    if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
         (removeableSuperstep >= 0)) {
       String oldSuperstepPath =
           getSuperstepPath(getApplicationAttempt()) + "/" +
@@ -1523,7 +1518,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // If we have completed the maximum number of supersteps, stop
     // the computation
     if (maxNumberOfSupersteps !=
-        GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS_DEFAULT &&
+        GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
         (getSuperstep() == maxNumberOfSupersteps - 1)) {
       if (LOG.isInfoEnabled()) {
         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
@@ -1661,9 +1656,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // provided (not dynamically started) and we don't want to keep the data
     try {
       if (getConfiguration().getZookeeperList() != null &&
-          !getConfiguration().getBoolean(
-              GiraphConstants.KEEP_ZOOKEEPER_DATA,
-              GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+          KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
         if (LOG.isInfoEnabled()) {
           LOG.info("cleanupZooKeeper: Removing the following path " +
               "and all children - " + basePath + " from ZooKeeper list " +
@@ -1713,6 +1706,8 @@ public class BspServiceMaster<I extends WritableComparable,
 
   @Override
   public void cleanup() throws IOException {
+    ImmutableClassesGiraphConfiguration conf = getConfiguration();
+
     // All master processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions
     // for workers and masters, the master will clean up the ZooKeeper
@@ -1744,9 +1739,7 @@ public class BspServiceMaster<I extends WritableComparable,
     if (isMaster) {
       cleanUpZooKeeper();
       // If desired, cleanup the checkpoint directory
-      if (getConfiguration().getBoolean(
-          GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
-          GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+      if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
         boolean success =
             getFs().delete(new Path(checkpointBasePath), true);
         if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 40c6b74..ba2f8eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -19,11 +19,10 @@
 package org.apache.giraph.master;
 
 import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -34,6 +33,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
+
 /**
  * Master thread that will coordinate the activities of the tasks.  It runs
  * on all task processes, however, will only execute its algorithm if it knows
@@ -76,9 +77,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
     this.bspServiceMaster = bspServiceMaster;
     this.context = context;
     GiraphTimers.init(context);
-    superstepCounterOn = context.getConfiguration().getBoolean(
-        GiraphConstants.USE_SUPERSTEP_COUNTERS,
-        GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
+    superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 6bc9591..3525302 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,12 +18,6 @@
 
 package org.apache.giraph.partition;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
@@ -32,6 +26,12 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -55,6 +55,9 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
+
 /**
  * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
  * Thread-safe, but expects the caller to synchronized between deletes, adds,
@@ -128,14 +131,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     this.conf = conf;
     this.context = context;
     // We must be able to hold at least one partition in memory
-    maxInMemoryPartitions = Math.max(1,
-        conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
-            GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+    maxInMemoryPartitions = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1);
 
     // Take advantage of multiple disks
-    String[] userPaths = conf.getStrings(
-        GiraphConstants.PARTITIONS_DIRECTORY,
-        GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+    String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
     basePaths = new String[userPaths.length];
     int i = 0;
     for (String path : userPaths) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index c83ca45..fc75006 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -18,14 +18,15 @@
 
 package org.apache.giraph.partition;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -34,6 +35,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
+
 /**
  * Helper class for {@link Partition} related operations.
  */
@@ -166,13 +169,10 @@ public class PartitionUtils {
           "computePartitionCount: No available workers");
     }
 
-    int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT,
-        GiraphConstants.DEFAULT_USER_PARTITION_COUNT);
+    int userPartitionCount = USER_PARTITION_COUNT.get(conf);
     int partitionCount;
-    if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) {
-      float multiplier = conf.getFloat(
-          GiraphConstants.PARTITION_COUNT_MULTIPLIER,
-          GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER);
+    if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
+      float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
       partitionCount =
           Math.max((int) (multiplier * availableWorkerInfos.size() *
               availableWorkerInfos.size()),

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index ae8556f..23e0f05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -33,6 +32,8 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+
 /**
  * A simple map-based container that stores vertices.  Vertex ids will map to
  * exactly one partition.
@@ -57,8 +58,7 @@ public class SimplePartition<I extends WritableComparable,
   @Override
   public void initialize(int partitionId, Progressable progressable) {
     super.initialize(partitionId, progressable);
-    if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
-        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+    if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
@@ -114,8 +114,7 @@ public class SimplePartition<I extends WritableComparable,
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
-    if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
-        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+    if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
       vertexMap = Maps.newConcurrentMap();

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 2bba672..4b03127 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -151,15 +151,15 @@ public class InternalVertexRunner {
       }
 
       conf.setWorkerConfiguration(1, 1, 100.0f);
-      conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
-      conf.setBoolean(GiraphConstants.LOCAL_TEST_MODE, true);
+      GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+      GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
       conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
           String.valueOf(LOCAL_ZOOKEEPER_PORT));
 
       conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
-      conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+      GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
           zkMgrDir.toString());
-      conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+      GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
 
       for (Map.Entry<String, String> param : params.entrySet()) {
         conf.set(param.getKey(), param.getValue());

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index 463510f..aedee6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -98,10 +98,8 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    sleepMillis = configuration.getInt(GiraphConstants.JMAP_SLEEP_MILLIS,
-        GiraphConstants.JMAP_SLEEP_MILLIS_DEFAULT);
-    linesToPrint = configuration.getInt(GiraphConstants.JMAP_PRINT_LINES,
-        GiraphConstants.JMAP_PRINT_LINES_DEFAULT);
+    sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
+    linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
index 7589a09..19f6be5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
@@ -18,20 +18,22 @@
 package org.apache.giraph.zk;
 
 
-import static java.lang.System.out;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.lang.System.out;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
 
 /**
  * A Utility class to be used by Giraph admins to occasionally clean up the
@@ -74,15 +76,13 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
    */
   @Override
   public int run(String[] args) {
-    final int zkPort = getConf().getInt(
-      GiraphConfiguration.ZOOKEEPER_SERVER_PORT,
-      GiraphConfiguration.ZOOKEEPER_SERVER_PORT_DEFAULT);
+    final int zkPort = ZOOKEEPER_SERVER_PORT.get(getConf());
     final String zkBasePath = getConf().get(
-      GiraphConfiguration.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
+      GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
     final String[] zkServerList;
     try {
       zkServerList = getConf()
-        .get(GiraphConfiguration.ZOOKEEPER_LIST).split(",");
+        .get(GiraphConstants.ZOOKEEPER_LIST).split(",");
     } catch (NullPointerException npe) {
       throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
         "of ZooKeeper servers to clean.");
@@ -94,9 +94,9 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
     try {
       ZooKeeperExt zooKeeper = new ZooKeeperExt(
         formatZkServerList(zkServerList, zkPort),
-        GiraphConfiguration.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT,
-        GiraphConfiguration.ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT,
-        GiraphConfiguration.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT,
+        GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
+        GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
+        GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
         this);
       doZooKeeperCleanup(zooKeeper, zkBasePath);
       return 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index add57fc..82a7fc1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
 
 
 /**
@@ -141,7 +142,7 @@ public class ZooKeeperManager {
     taskPartition = conf.getTaskPartition();
     jobId = conf.get("mapred.job.id", "Unknown Job");
     baseDirectory =
-        new Path(conf.get(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+        new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
             getFinalZooKeeperPath()));
     taskDirectory = new Path(baseDirectory,
         "_task");
@@ -150,25 +151,19 @@ public class ZooKeeperManager {
     myClosedPath = new Path(taskDirectory,
         Integer.toString(taskPartition) +
         COMPUTATION_DONE_SUFFIX);
-    pollMsecs = conf.getInt(
-        GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS,
-        GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
-    serverCount = conf.getInt(
-        GiraphConstants.ZOOKEEPER_SERVER_COUNT,
-        GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+    pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
+    serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
     String jobLocalDir = conf.get("job.local.dir");
     if (jobLocalDir != null) { // for non-local jobs
       zkDirDefault = jobLocalDir +
           "/_bspZooKeeper";
     } else {
       zkDirDefault = System.getProperty("user.dir") + "/" +
-              GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT;
+              ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
     }
     zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
     configFilePath = zkDir + "/zoo.cfg";
-    zkBasePort = conf.getInt(
-        GiraphConstants.ZOOKEEPER_SERVER_PORT,
-        GiraphConstants.ZOOKEEPER_SERVER_PORT_DEFAULT);
+    zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
 
     myHostname = conf.getLocalHostname();
     fs = FileSystem.get(conf);
@@ -180,7 +175,7 @@ public class ZooKeeperManager {
    * @return directory path with job id
    */
   private String getFinalZooKeeperPath() {
-    return GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
+    return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
   }
 
   /**
@@ -305,7 +300,7 @@ public class ZooKeeperManager {
               "for base directory " + baseDirectory + ".  If there is an " +
               "issue with this directory, please set an accesible " +
               "base directory with the Hadoop configuration option " +
-              GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY);
+              ZOOKEEPER_MANAGER_DIRECTORY.getKey());
     }
 
     Path myCandidacyPath = new Path(
@@ -627,9 +622,7 @@ public class ZooKeeperManager {
             "onlineZooKeeperServers: java.home is not set!");
       }
       commandList.add(javaHome + "/bin/java");
-      String zkJavaOptsString =
-          conf.get(GiraphConstants.ZOOKEEPER_JAVA_OPTS,
-              GiraphConstants.ZOOKEEPER_JAVA_OPTS_DEFAULT);
+      String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf);
       String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
       if (zkJavaOptsArray != null) {
         commandList.addAll(Arrays.asList(zkJavaOptsArray));

http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index a6eef20..4c74d3f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -98,11 +98,11 @@ public class BspCase implements Watcher {
           "location " + getJarLocation() + " for " + getName());
       conf.setWorkerConfiguration(1, 1, 100.0f);
       // Single node testing
-      conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
+      GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
     }
     conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
     conf.setEventWaitMsecs(3 * 1000);
-    conf.setInt(GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+    GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
     if (getZooKeeperList() != null) {
       conf.setZooKeeperConfiguration(getZooKeeperList());
     }
@@ -120,9 +120,9 @@ public class BspCase implements Watcher {
     FileUtils.deletePath(conf, checkPointDir);
 
     conf.set(GiraphConstants.ZOOKEEPER_DIR, zookeeperDir.toString());
-    conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+    GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
         zkManagerDir.toString());
-    conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkPointDir.toString());
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkPointDir.toString());
 
     return conf;
   }