You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/28 01:02:29 UTC
[2/3] GIRAPH-587: Refactor configuration options (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7882d06..c5b9b93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,72 +17,147 @@
*/
package org.apache.giraph.conf;
+import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.partition.HashPartitionerFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
+import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
/**
* Constants used all over Giraph for configuration.
*/
// CHECKSTYLE: stop InterfaceIsTypeCheck
public interface GiraphConstants {
+ /** 1KB in bytes */
+ int ONE_KB = 1024;
+
/** Vertex class - required */
- String VERTEX_CLASS = "giraph.vertexClass";
+ ClassConfOption<Vertex> VERTEX_CLASS =
+ ClassConfOption.create("giraph.vertexClass", null, Vertex.class);
/** Vertex value factory class - optional */
- String VERTEX_VALUE_FACTORY_CLASS = "giraph.vertexValueFactoryClass";
+ ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
+ ClassConfOption.create("giraph.vertexValueFactoryClass",
+ DefaultVertexValueFactory.class, VertexValueFactory.class);
/** Vertex edges class - optional */
- String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
+ ClassConfOption<VertexEdges> VERTEX_EDGES_CLASS =
+ ClassConfOption.create("giraph.vertexEdgesClass", ByteArrayEdges.class,
+ VertexEdges.class);
/** Vertex edges class to be used during edge input only - optional */
- String INPUT_VERTEX_EDGES_CLASS = "giraph.inputVertexEdgesClass";
+ ClassConfOption<VertexEdges> INPUT_VERTEX_EDGES_CLASS =
+ ClassConfOption.create("giraph.inputVertexEdgesClass",
+ ByteArrayEdges.class, VertexEdges.class);
/** Class for Master - optional */
- String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+ ClassConfOption<MasterCompute> MASTER_COMPUTE_CLASS =
+ ClassConfOption.create("giraph.masterComputeClass",
+ DefaultMasterCompute.class, MasterCompute.class);
/** Classes for Master Observer - optional */
- String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+ ClassConfOption<MasterObserver> MASTER_OBSERVER_CLASSES =
+ ClassConfOption.create("giraph.master.observers",
+ null, MasterObserver.class);
/** Classes for Worker Observer - optional */
- String WORKER_OBSERVER_CLASSES = "giraph.worker.observers";
+ ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
+ ClassConfOption.create("giraph.worker.observers", null,
+ WorkerObserver.class);
/** Vertex combiner class - optional */
- String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+ ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
+ ClassConfOption.create("giraph.combinerClass", null, Combiner.class);
/** Vertex resolver class - optional */
- String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+ ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
+ ClassConfOption.create("giraph.vertexResolverClass",
+ DefaultVertexResolver.class, VertexResolver.class);
+
/**
* Option of whether to create vertexes that were not existent before but
* received messages
*/
- String RESOLVER_CREATE_VERTEX_ON_MSGS =
- "giraph.vertex.resolver.create.on.msgs";
+ BooleanConfOption RESOLVER_CREATE_VERTEX_ON_MSGS =
+ new BooleanConfOption("giraph.vertex.resolver.create.on.msgs", true);
/** Graph partitioner factory class - optional */
- String GRAPH_PARTITIONER_FACTORY_CLASS =
- "giraph.graphPartitionerFactoryClass";
+ ClassConfOption<GraphPartitionerFactory> GRAPH_PARTITIONER_FACTORY_CLASS =
+ ClassConfOption.create("giraph.graphPartitionerFactoryClass",
+ HashPartitionerFactory.class, GraphPartitionerFactory.class);
/** Observer class to watch over job status - optional */
- String JOB_OBSERVER_CLASS = "giraph.jobObserverClass";
+ ClassConfOption<GiraphJobObserver> JOB_OBSERVER_CLASS =
+ ClassConfOption.create("giraph.jobObserverClass",
+ DefaultJobObserver.class, GiraphJobObserver.class);
// At least one of the input format classes is required.
/** VertexInputFormat class */
- String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+ ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.vertexInputFormatClass", null,
+ VertexInputFormat.class);
/** EdgeInputFormat class */
- String EDGE_INPUT_FORMAT_CLASS = "giraph.edgeInputFormatClass";
+ ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.edgeInputFormatClass", null,
+ EdgeInputFormat.class);
/** VertexOutputFormat class */
- String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+ ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.vertexOutputFormatClass", null,
+ VertexOutputFormat.class);
/** Output Format Path (for Giraph-on-YARN) */
String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
/** Vertex index class */
- String VERTEX_ID_CLASS = "giraph.vertexIdClass";
+ ClassConfOption<WritableComparable> VERTEX_ID_CLASS =
+ ClassConfOption.create("giraph.vertexIdClass", null,
+ WritableComparable.class);
/** Vertex value class */
- String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+ ClassConfOption<Writable> VERTEX_VALUE_CLASS =
+ ClassConfOption.create("giraph.vertexValueClass", null, Writable.class);
/** Edge value class */
- String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+ ClassConfOption<Writable> EDGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.edgeValueClass", null, Writable.class);
/** Message value class */
- String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+ ClassConfOption<Writable> MESSAGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.messageValueClass", null, Writable.class);
/** Partition context class */
- String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass";
+ ClassConfOption<PartitionContext> PARTITION_CONTEXT_CLASS =
+ ClassConfOption.create("giraph.partitionContextClass",
+ DefaultPartitionContext.class, PartitionContext.class);
/** Worker context class */
- String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+ ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
+ ClassConfOption.create("giraph.workerContextClass",
+ DefaultWorkerContext.class, WorkerContext.class);
/** AggregatorWriter class - optional */
- String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass";
+ ClassConfOption<AggregatorWriter> AGGREGATOR_WRITER_CLASS =
+ ClassConfOption.create("giraph.aggregatorWriterClass",
+ TextAggregatorWriter.class, AggregatorWriter.class);
/** Partition class - optional */
- String PARTITION_CLASS = "giraph.partitionClass";
+ ClassConfOption<Partition> PARTITION_CLASS =
+ ClassConfOption.create("giraph.partitionClass", SimplePartition.class,
+ Partition.class);
/**
* Minimum number of simultaneous workers before this job can run (int)
@@ -97,54 +172,42 @@ public interface GiraphConstants {
* Separate the workers and the master tasks. This is required
* to support dynamic recovery. (boolean)
*/
- String SPLIT_MASTER_WORKER = "giraph.SplitMasterWorker";
- /**
- * Default on whether to separate the workers and the master tasks.
- * Needs to be "true" to support dynamic recovery.
- */
- boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+ BooleanConfOption SPLIT_MASTER_WORKER =
+ new BooleanConfOption("giraph.SplitMasterWorker", true);
/** Indicates whether this job is run in an internal unit test */
- String LOCAL_TEST_MODE = "giraph.localTestMode";
-
- /** not in local test mode per default */
- boolean LOCAL_TEST_MODE_DEFAULT = false;
+ BooleanConfOption LOCAL_TEST_MODE =
+ new BooleanConfOption("giraph.localTestMode", false);
/** Override the Hadoop log level and set the desired log level. */
- String LOG_LEVEL = "giraph.logLevel";
- /** Default log level is INFO (same as Hadoop) */
- String LOG_LEVEL_DEFAULT = "info";
+ StrConfOption LOG_LEVEL = new StrConfOption("giraph.logLevel", "info");
/** Use thread level debugging? */
- String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
- /** Default to not use thread-level debugging */
- boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+ BooleanConfOption LOG_THREAD_LAYOUT =
+ new BooleanConfOption("giraph.logThreadLayout", false);
/** Configuration key to enable jmap printing */
- String JMAP_ENABLE = "giraph.jmap.histo.enable";
- /** Default value for enabling jmap */
- boolean JMAP_ENABLE_DEFAULT = false;
+ BooleanConfOption JMAP_ENABLE =
+ new BooleanConfOption("giraph.jmap.histo.enable", false);
/** Configuration key for msec to sleep between calls */
- String JMAP_SLEEP_MILLIS = "giraph.jmap.histo.msec";
- /** Default msec to sleep between calls */
- int JMAP_SLEEP_MILLIS_DEFAULT = 30000;
+ IntConfOption JMAP_SLEEP_MILLIS =
+ new IntConfOption("giraph.jmap.histo.msec", SECONDS.toMillis(30));
/** Configuration key for how many lines to print */
- String JMAP_PRINT_LINES = "giraph.jmap.histo.print_lines";
- /** Default lines of output to print */
- int JMAP_PRINT_LINES_DEFAULT = 30;
+ IntConfOption JMAP_PRINT_LINES =
+ new IntConfOption("giraph.jmap.histo.print_lines", 30);
/**
* Minimum percent of the maximum number of workers that have responded
* in order to continue progressing. (float)
*/
- String MIN_PERCENT_RESPONDED = "giraph.minPercentResponded";
- /** Default 100% response rate for workers */
- float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
+ FloatConfOption MIN_PERCENT_RESPONDED =
+ new FloatConfOption("giraph.minPercentResponded", 100.0f);
/** Enable the Metrics system **/
- String METRICS_ENABLE = "giraph.metrics.enable";
+ BooleanConfOption METRICS_ENABLE =
+ new BooleanConfOption("giraph.metrics.enable", false);
/**
* ZooKeeper comma-separated list (if not set,
@@ -153,24 +216,20 @@ public interface GiraphConstants {
String ZOOKEEPER_LIST = "giraph.zkList";
/** ZooKeeper session millisecond timeout */
- String ZOOKEEPER_SESSION_TIMEOUT = "giraph.zkSessionMsecTimeout";
- /** Default Zookeeper session millisecond timeout */
- int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
+ IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1));
/** Polling interval to check for the ZooKeeper server data */
- String ZOOKEEPER_SERVERLIST_POLL_MSECS = "giraph.zkServerlistPollMsecs";
- /** Default polling interval to check for the ZooKeeper server data */
- int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = 3 * 1000;
+ IntConfOption ZOOKEEPER_SERVERLIST_POLL_MSECS =
+ new IntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3));
/** Number of nodes (not tasks) to run Zookeeper on */
- String ZOOKEEPER_SERVER_COUNT = "giraph.zkServerCount";
- /** Default number of nodes to run Zookeeper on */
- int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
+ IntConfOption ZOOKEEPER_SERVER_COUNT =
+ new IntConfOption("giraph.zkServerCount", 1);
/** ZooKeeper port to use */
- String ZOOKEEPER_SERVER_PORT = "giraph.zkServerPort";
- /** Default ZooKeeper port to use */
- int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
+ IntConfOption ZOOKEEPER_SERVER_PORT =
+ new IntConfOption("giraph.zkServerPort", 22181);
/** Location of the ZooKeeper jar - Used internally, not meant for users */
String ZOOKEEPER_JAR = "giraph.zkJar";
@@ -179,187 +238,130 @@ public interface GiraphConstants {
String ZOOKEEPER_DIR = "giraph.zkDir";
/** Max attempts for handling ZooKeeper connection loss */
- String ZOOKEEPER_OPS_MAX_ATTEMPTS = "giraph.zkOpsMaxAttempts";
- /** Default of 3 attempts for handling ZooKeeper connection loss */
- int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+ IntConfOption ZOOKEEPER_OPS_MAX_ATTEMPTS =
+ new IntConfOption("giraph.zkOpsMaxAttempts", 3);
/**
- * Msecs to wait before retrying a failed ZooKeeper op due to connection
- * loss.
+ * Msecs to wait before retrying a failed ZooKeeper op due to connection loss.
*/
- String ZOOKEEPER_OPS_RETRY_WAIT_MSECS = "giraph.zkOpsRetryWaitMsecs";
- /**
- * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
- * connection loss.
- */
- int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+ IntConfOption ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
+ new IntConfOption("giraph.zkOpsRetryWaitMsecs", SECONDS.toMillis(5));
/** TCP backlog (defaults to number of workers) */
- String TCP_BACKLOG = "giraph.tcpBacklog";
- /**
- * Default TCP backlog default if the number of workers is not specified
- * (i.e unittests)
- */
- int TCP_BACKLOG_DEFAULT = 1;
+ IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1);
- /** How big to make the default buffer? */
- String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
- "giraph.nettyRequestEncoderBufferSize";
- /** Start with 32K */
- int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
+ /** How big to make the encoder buffer? */
+ IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+ new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB);
/** Whether or not netty request encoder should use direct byte buffers */
- String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
- "giraph.nettyRequestEncoderUseDirectBuffers";
- /**
- * By default don't use direct buffers,
- * since jobs can take more than allowed heap memory in that case
- */
- boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false;
+ BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
+ new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers",
+ false);
/** Netty client threads */
- String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
- /** Default is 4 */
- int NETTY_CLIENT_THREADS_DEFAULT = 4;
+ IntConfOption NETTY_CLIENT_THREADS =
+ new IntConfOption("giraph.nettyClientThreads", 4);
/** Netty server threads */
- String NETTY_SERVER_THREADS = "giraph.nettyServerThreads";
- /** Default is 16 */
- int NETTY_SERVER_THREADS_DEFAULT = 16;
+ IntConfOption NETTY_SERVER_THREADS =
+ new IntConfOption("giraph.nettyServerThreads", 16);
/** Use the execution handler in netty on the client? */
- String NETTY_CLIENT_USE_EXECUTION_HANDLER =
- "giraph.nettyClientUseExecutionHandler";
- /** Use the execution handler in netty on the client - default true */
- boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT = true;
+ BooleanConfOption NETTY_CLIENT_USE_EXECUTION_HANDLER =
+ new BooleanConfOption("giraph.nettyClientUseExecutionHandler", true);
/** Netty client execution threads (execution handler) */
- String NETTY_CLIENT_EXECUTION_THREADS =
- "giraph.nettyClientExecutionThreads";
- /** Default Netty client execution threads (execution handler) of 8 */
- int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+ IntConfOption NETTY_CLIENT_EXECUTION_THREADS =
+ new IntConfOption("giraph.nettyClientExecutionThreads", 8);
/** Where to place the netty client execution handle? */
- String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
- "giraph.nettyClientExecutionAfterHandler";
- /**
- * Default is to use the netty client execution handle after the request
- * encoder.
- */
- String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT = "requestEncoder";
+ StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+ new StrConfOption("giraph.nettyClientExecutionAfterHandler",
+ "requestEncoder");
/** Use the execution handler in netty on the server? */
- String NETTY_SERVER_USE_EXECUTION_HANDLER =
- "giraph.nettyServerUseExecutionHandler";
- /** Use the execution handler in netty on the server - default true */
- boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT = true;
+ BooleanConfOption NETTY_SERVER_USE_EXECUTION_HANDLER =
+ new BooleanConfOption("giraph.nettyServerUseExecutionHandler", true);
/** Netty server execution threads (execution handler) */
- String NETTY_SERVER_EXECUTION_THREADS = "giraph.nettyServerExecutionThreads";
- /** Default Netty server execution threads (execution handler) of 8 */
- int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+ IntConfOption NETTY_SERVER_EXECUTION_THREADS =
+ new IntConfOption("giraph.nettyServerExecutionThreads", 8);
/** Where to place the netty server execution handle? */
- String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
- "giraph.nettyServerExecutionAfterHandler";
- /**
- * Default is to use the netty server execution handle after the request
- * frame decoder.
- */
- String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT = "requestFrameDecoder";
+ StrConfOption NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+ new StrConfOption("giraph.nettyServerExecutionAfterHandler",
+ "requestFrameDecoder");
/** Netty simulate a first request closed */
- String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
- "giraph.nettySimulateFirstRequestClosed";
- /** Default of not simulating failure for first request */
- boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT = false;
+ BooleanConfOption NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+ new BooleanConfOption("giraph.nettySimulateFirstRequestClosed", false);
/** Netty simulate a first response failed */
- String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
- "giraph.nettySimulateFirstResponseFailed";
- /** Default of not simulating failure for first reponse */
- boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT = false;
+ BooleanConfOption NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+ new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false);
/** Max resolve address attempts */
- String MAX_RESOLVE_ADDRESS_ATTEMPTS = "giraph.maxResolveAddressAttempts";
- /** Default max resolve address attempts */
- int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+ IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
+ new IntConfOption("giraph.maxResolveAddressAttempts", 5);
/** Msecs to wait between waiting for all requests to finish */
- String WAITING_REQUEST_MSECS = "giraph.waitingRequestMsecs";
- /** Default msecs to wait between waiting for all requests to finish */
- int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+ IntConfOption WAITING_REQUEST_MSECS =
+ new IntConfOption("giraph.waitingRequestMsecs", SECONDS.toMillis(15));
/** Millseconds to wait for an event before continuing */
- String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
- /**
- * Default milliseconds to wait for an event before continuing (30 seconds)
- */
- int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+ IntConfOption EVENT_WAIT_MSECS =
+ new IntConfOption("giraph.eventWaitMsecs", SECONDS.toMillis(30));
/**
* Maximum milliseconds to wait before giving up trying to get the minimum
* number of workers before a superstep (int).
*/
- String MAX_MASTER_SUPERSTEP_WAIT_MSECS = "giraph.maxMasterSuperstepWaitMsecs";
- /**
- * Default maximum milliseconds to wait before giving up trying to get
- * the minimum number of workers before a superstep (10 minutes).
- */
- int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT = 10 * 60 * 1000;
+ IntConfOption MAX_MASTER_SUPERSTEP_WAIT_MSECS =
+ new IntConfOption("giraph.maxMasterSuperstepWaitMsecs",
+ MINUTES.toMillis(10));
/** Milliseconds for a request to complete (or else resend) */
- String MAX_REQUEST_MILLISECONDS = "giraph.maxRequestMilliseconds";
- /** Maximum number of milliseconds for a request to complete (10 minutes) */
- int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
+ IntConfOption MAX_REQUEST_MILLISECONDS =
+ new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10));
/** Netty max connection failures */
- String NETTY_MAX_CONNECTION_FAILURES = "giraph.nettyMaxConnectionFailures";
- /** Default Netty max connection failures */
- int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+ IntConfOption NETTY_MAX_CONNECTION_FAILURES =
+ new IntConfOption("giraph.nettyMaxConnectionFailures", 1000);
/** Initial port to start using for the IPC communication */
- String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
- /** Default port to start using for the IPC communication */
- int IPC_INITIAL_PORT_DEFAULT = 30000;
+ IntConfOption IPC_INITIAL_PORT =
+ new IntConfOption("giraph.ipcInitialPort", 30000);
/** Maximum bind attempts for different IPC ports */
- String MAX_IPC_PORT_BIND_ATTEMPTS = "giraph.maxIpcPortBindAttempts";
- /** Default maximum bind attempts for different IPC ports */
- int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+ IntConfOption MAX_IPC_PORT_BIND_ATTEMPTS =
+ new IntConfOption("giraph.maxIpcPortBindAttempts", 20);
/**
* Fail first IPC port binding attempt, simulate binding failure
* on real grid testing
*/
- String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
- "giraph.failFirstIpcPortBindAttempt";
- /** Default fail first IPC port binding attempt flag */
- boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
+ BooleanConfOption FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+ new BooleanConfOption("giraph.failFirstIpcPortBindAttempt", false);
/** Client send buffer size */
- String CLIENT_SEND_BUFFER_SIZE = "giraph.clientSendBufferSize";
- /** Default client send buffer size of 0.5 MB */
- int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+ IntConfOption CLIENT_SEND_BUFFER_SIZE =
+ new IntConfOption("giraph.clientSendBufferSize", 512 * ONE_KB);
/** Client receive buffer size */
- String CLIENT_RECEIVE_BUFFER_SIZE = "giraph.clientReceiveBufferSize";
- /** Default client receive buffer size of 32 k */
- int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+ IntConfOption CLIENT_RECEIVE_BUFFER_SIZE =
+ new IntConfOption("giraph.clientReceiveBufferSize", 32 * ONE_KB);
/** Server send buffer size */
- String SERVER_SEND_BUFFER_SIZE = "giraph.serverSendBufferSize";
- /** Default server send buffer size of 32 k */
- int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+ IntConfOption SERVER_SEND_BUFFER_SIZE =
+ new IntConfOption("giraph.serverSendBufferSize", 32 * ONE_KB);
/** Server receive buffer size */
- String SERVER_RECEIVE_BUFFER_SIZE = "giraph.serverReceiveBufferSize";
- /** Default server receive buffer size of 0.5 MB */
- int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+ IntConfOption SERVER_RECEIVE_BUFFER_SIZE =
+ new IntConfOption("giraph.serverReceiveBufferSize", 512 * ONE_KB);
/** Maximum size of messages (in bytes) per peer before flush */
- String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
- /** Default maximum size of messages per peer before flush of 0.5MB */
- int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
+ IntConfOption MAX_MSG_REQUEST_SIZE =
+ new IntConfOption("giraph.msgRequestSize", 512 * ONE_KB);
/**
* How much bigger than the average per partition size to make initial per
@@ -368,34 +370,23 @@ public interface GiraphConstants {
* and a worker has P partitions, than its initial partition buffer size
* will be (M / P) * (1 + A).
*/
- String ADDITIONAL_MSG_REQUEST_SIZE =
- "giraph.additionalMsgRequestSize";
- /**
- * Default factor for how bigger should initial per partition buffers be
- * of 20%.
- */
- float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+ FloatConfOption ADDITIONAL_MSG_REQUEST_SIZE =
+ new FloatConfOption("giraph.additionalMsgRequestSize", 0.2f);
/** Maximum size of edges (in bytes) per peer before flush */
- String MAX_EDGE_REQUEST_SIZE = "giraph.edgeRequestSize";
- /** Default maximum size of edges per peer before flush of 0.5MB */
- int MAX_EDGE_REQUEST_SIZE_DEFAULT = 512 * 1024;
+ IntConfOption MAX_EDGE_REQUEST_SIZE =
+ new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB);
/**
* Additional size (expressed as a ratio) of each per-partition buffer on
* top of the average size.
*/
- String ADDITIONAL_EDGE_REQUEST_SIZE =
- "giraph.additionalEdgeRequestSize";
- /**
- * Default additional per-partition buffer size.
- */
- float ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT = 0.2f;
+ FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
+ new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f);
/** Maximum number of mutations per partition before flush */
- String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
- /** Default maximum number of mutations per partition before flush */
- int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
+ IntConfOption MAX_MUTATIONS_PER_REQUEST =
+ new IntConfOption("giraph.maxMutationsPerRequest", 100);
/**
* Whether we should reuse the same Edge object when adding edges from
@@ -403,59 +394,38 @@ public interface GiraphConstants {
* This works with edge storage implementations that don't keep references
* to the input Edge objects (e.g., ByteArrayVertex).
*/
- String REUSE_INCOMING_EDGE_OBJECTS = "giraph.reuseIncomingEdgeObjects";
- /**
- * Default is to not reuse edge objects (since it's not compatible with
- * all storage implementations).
- */
- boolean REUSE_INCOMING_EDGE_OBJECTS_DEFAULT = false;
+ BooleanConfOption REUSE_INCOMING_EDGE_OBJECTS =
+ new BooleanConfOption("giraph.reuseIncomingEdgeObjects", false);
/**
* Use message size encoding (typically better for complex objects,
* not meant for primitive wrapped messages)
*/
- String USE_MESSAGE_SIZE_ENCODING = "giraph.useMessageSizeEncoding";
- /**
- * By default, do not use message size encoding as it is experimental.
- */
- boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+ BooleanConfOption USE_MESSAGE_SIZE_ENCODING =
+ new BooleanConfOption("giraph.useMessageSizeEncoding", false);
/** Number of channels used per server */
- String CHANNELS_PER_SERVER = "giraph.channelsPerServer";
- /** Default number of channels used per server of 1 */
- int DEFAULT_CHANNELS_PER_SERVER = 1;
+ IntConfOption CHANNELS_PER_SERVER =
+ new IntConfOption("giraph.channelsPerServer", 1);
/** Number of flush threads per peer */
String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
/** Number of threads for vertex computation */
- String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
- /** Default number of threads for vertex computation */
- int NUM_COMPUTE_THREADS_DEFAULT = 1;
+ IntConfOption NUM_COMPUTE_THREADS =
+ new IntConfOption("giraph.numComputeThreads", 1);
/** Number of threads for input splits loading */
- String NUM_INPUT_SPLITS_THREADS = "giraph.numInputSplitsThreads";
- /** Default number of threads for input splits loading */
- int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+ IntConfOption NUM_INPUT_SPLITS_THREADS =
+ new IntConfOption("giraph.numInputSplitsThreads", 1);
/** Minimum stragglers of the superstep before printing them out */
- String PARTITION_LONG_TAIL_MIN_PRINT = "giraph.partitionLongTailMinPrint";
- /** Only print stragglers with one as a default */
- int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
+ IntConfOption PARTITION_LONG_TAIL_MIN_PRINT =
+ new IntConfOption("giraph.partitionLongTailMinPrint", 1);
/** Use superstep counters? (boolean) */
- String USE_SUPERSTEP_COUNTERS = "giraph.useSuperstepCounters";
- /** Default is to use the superstep counters */
- boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
- /**
- * Set the multiplicative factor of how many partitions to create from
- * a single InputSplit based on the number of total InputSplits. For
- * example, if there are 10 total InputSplits and this is set to 0.5, then
- * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
- * minimum size is met).
- */
- String TOTAL_INPUT_SPLIT_MULTIPLIER = "giraph.totalInputSplitMultiplier";
+ BooleanConfOption USE_SUPERSTEP_COUNTERS =
+ new BooleanConfOption("giraph.useSuperstepCounters", true);
/**
* Input split sample percent - Used only for sampling and testing, rather
@@ -463,31 +433,24 @@ public interface GiraphConstants {
* fraction of the actual input splits from your VertexInputFormat to
* load (values should be [0, 100]).
*/
- String INPUT_SPLIT_SAMPLE_PERCENT = "giraph.inputSplitSamplePercent";
- /** Default is to use all the input splits */
- float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
+ FloatConfOption INPUT_SPLIT_SAMPLE_PERCENT =
+ new FloatConfOption("giraph.inputSplitSamplePercent", 100f);
/**
* To limit outlier vertex input splits from producing too many vertices or
* to help with testing, the number of vertices loaded from an input split
* can be limited. By default, everything is loaded.
*/
- String INPUT_SPLIT_MAX_VERTICES = "giraph.InputSplitMaxVertices";
- /**
- * Default is that all the vertices are to be loaded from the input split
- */
- long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+ LongConfOption INPUT_SPLIT_MAX_VERTICES =
+ new LongConfOption("giraph.InputSplitMaxVertices", -1);
/**
* To limit outlier vertex input splits from producing too many vertices or
* to help with testing, the number of edges loaded from an input split
* can be limited. By default, everything is loaded.
*/
- String INPUT_SPLIT_MAX_EDGES = "giraph.InputSplitMaxEdges";
- /**
- * Default is that all the edges are to be loaded from the input split
- */
- long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+ LongConfOption INPUT_SPLIT_MAX_EDGES =
+ new LongConfOption("giraph.InputSplitMaxEdges", -1);
/**
* To minimize network usage when reading input splits,
@@ -496,24 +459,16 @@ public interface GiraphConstants {
* Hence, users with a lot of splits and input threads (or with
* configurations that can't exploit locality) may want to disable it.
*/
- String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality";
-
- /**
- * Default is to prioritize local input splits.
- */
- boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+ BooleanConfOption USE_INPUT_SPLIT_LOCALITY =
+ new BooleanConfOption("giraph.useInputSplitLocality", true);
/** Multiplier for the current workers squared */
- String PARTITION_COUNT_MULTIPLIER =
- "partition.masterPartitionCountMultipler";
- /** Default mulitplier for current workers squared */
- float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+ FloatConfOption PARTITION_COUNT_MULTIPLIER =
+ new FloatConfOption("partition.masterPartitionCountMultipler", 1.0f);
/** Overrides default partition count calculation if not -1 */
- String USER_PARTITION_COUNT =
- "partition.userPartitionCount";
- /** Default user partition count */
- int DEFAULT_USER_PARTITION_COUNT = -1;
+ IntConfOption USER_PARTITION_COUNT =
+ new IntConfOption("partition.userPartitionCount", -1);
/** Vertex key space size for
* {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
@@ -521,28 +476,23 @@ public interface GiraphConstants {
String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize";
/** Java opts passed to ZooKeeper startup */
- String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
- /** Default java opts passed to ZooKeeper startup */
- String ZOOKEEPER_JAVA_OPTS_DEFAULT =
- "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
- "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
+ StrConfOption ZOOKEEPER_JAVA_OPTS =
+ new StrConfOption("giraph.zkJavaOpts",
+ "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+ "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100");
/**
* How often to checkpoint (i.e. 0, means no checkpoint,
* 1 means every superstep, 2 is every two supersteps, etc.).
*/
- String CHECKPOINT_FREQUENCY = "giraph.checkpointFrequency";
-
- /** Default checkpointing frequency of none. */
- int CHECKPOINT_FREQUENCY_DEFAULT = 0;
+ IntConfOption CHECKPOINT_FREQUENCY =
+ new IntConfOption("giraph.checkpointFrequency", 0);
/**
* Delete checkpoints after a successful job run?
*/
- String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
- "giraph.cleanupCheckpointsAfterSuccess";
- /** Default is to clean up the checkponts after a successful job */
- boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = true;
+ BooleanConfOption CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
+ new BooleanConfOption("giraph.cleanupCheckpointsAfterSuccess", true);
/**
* An application can be restarted manually by selecting a superstep. The
@@ -561,76 +511,56 @@ public interface GiraphConstants {
* If ZOOKEEPER_LIST is not set, then use this directory to manage
* ZooKeeper
*/
- String ZOOKEEPER_MANAGER_DIRECTORY = "giraph.zkManagerDirectory";
- /**
- * Default ZooKeeper manager directory (where determining the servers in
- * HDFS files will go). directory path will also have job number
- * for uniqueness.
- */
- String ZOOKEEPER_MANAGER_DIR_DEFAULT = "_bsp/_defaultZkManagerDir";
+ StrConfOption ZOOKEEPER_MANAGER_DIRECTORY =
+ new StrConfOption("giraph.zkManagerDirectory",
+ "_bsp/_defaultZkManagerDir");
/** Number of ZooKeeper client connection attempts before giving up. */
- String ZOOKEEPER_CONNECTION_ATTEMPTS = "giraph.zkConnectionAttempts";
- /** Default of 10 ZooKeeper client connection attempts before giving up. */
- int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+ IntConfOption ZOOKEEPER_CONNECTION_ATTEMPTS =
+ new IntConfOption("giraph.zkConnectionAttempts", 10);
/** This directory has/stores the available checkpoint files in HDFS. */
- String CHECKPOINT_DIRECTORY = "giraph.checkpointDirectory";
- /**
- * Default checkpoint directory (where checkpoing files go in HDFS). Final
- * directory path will also have the job number for uniqueness
- */
- String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/";
+ StrConfOption CHECKPOINT_DIRECTORY =
+ new StrConfOption("giraph.checkpointDirectory", "_bsp/_checkpoints/");
/**
* Comma-separated list of directories in the local file system for
* out-of-core messages.
*/
- String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
- /**
- * Default messages directory. directory path will also have the
- * job number for uniqueness
- */
- String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
+ StrConfOption MESSAGES_DIRECTORY =
+ new StrConfOption("giraph.messagesDirectory", "_bsp/_messages/");
/** Whether or not to use out-of-core messages */
- String USE_OUT_OF_CORE_MESSAGES = "giraph.useOutOfCoreMessages";
- /** Default choice about using out-of-core messaging */
- boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
+ BooleanConfOption USE_OUT_OF_CORE_MESSAGES =
+ new BooleanConfOption("giraph.useOutOfCoreMessages", false);
/**
* If using out-of-core messaging, it tells how much messages do we keep
* in memory.
*/
- String MAX_MESSAGES_IN_MEMORY = "giraph.maxMessagesInMemory";
- /** Default maximum number of messages in memory. */
- int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
+ IntConfOption MAX_MESSAGES_IN_MEMORY =
+ new IntConfOption("giraph.maxMessagesInMemory", 1000000);
/** Size of buffer when reading and writing messages out-of-core. */
- String MESSAGES_BUFFER_SIZE = "giraph.messagesBufferSize";
- /** Default size of buffer when reading and writing messages out-of-core. */
- int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+ IntConfOption MESSAGES_BUFFER_SIZE =
+ new IntConfOption("giraph.messagesBufferSize", 8 * ONE_KB);
/**
* Comma-separated list of directories in the local filesystem for
* out-of-core partitions.
*/
- String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
- /** Default directory for out-of-core partitions. */
- String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+ StrConfOption PARTITIONS_DIRECTORY =
+ new StrConfOption("giraph.partitionsDirectory", "_bsp/_partitions");
/** Enable out-of-core graph. */
- String USE_OUT_OF_CORE_GRAPH = "giraph.useOutOfCoreGraph";
- /** Default is not to use out-of-core graph. */
- boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+ BooleanConfOption USE_OUT_OF_CORE_GRAPH =
+ new BooleanConfOption("giraph.useOutOfCoreGraph", false);
/** Maximum number of partitions to hold in memory for each worker. */
- String MAX_PARTITIONS_IN_MEMORY = "giraph.maxPartitionsInMemory";
- /** Default maximum number of in-memory partitions. */
- int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+ IntConfOption MAX_PARTITIONS_IN_MEMORY =
+ new IntConfOption("giraph.maxPartitionsInMemory", 10);
/** Keep the zookeeper output for debugging? Default is to remove it. */
- String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData";
- /** Default is to remove ZooKeeper data. */
- Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
+ BooleanConfOption KEEP_ZOOKEEPER_DATA =
+ new BooleanConfOption("giraph.keepZooKeeperData", false);
/** Default ZooKeeper tick time. */
int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
@@ -643,57 +573,50 @@ public interface GiraphConstants {
/** Default ZooKeeper maximum client connections. */
int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
/** ZooKeeper minimum session timeout */
- String ZOOKEEPER_MIN_SESSION_TIMEOUT = "giraph.zKMinSessionTimeout";
- /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
- int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+ IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10));
/** ZooKeeper maximum session timeout */
- String ZOOKEEPER_MAX_SESSION_TIMEOUT = "giraph.zkMaxSessionTimeout";
- /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
- int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+ IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15));
/** ZooKeeper force sync */
- String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
- /** Default ZooKeeper force sync is off (for performance) */
- String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+ StrConfOption ZOOKEEPER_FORCE_SYNC =
+ new StrConfOption("giraph.zKForceSync", "no");
/** ZooKeeper skip ACLs */
- String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
- /** Default ZooKeeper skip ACLs true (for performance) */
- String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
+ StrConfOption ZOOKEEPER_SKIP_ACL =
+ new StrConfOption("giraph.ZkSkipAcl", "yes");
/**
* Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
* and authorize Netty BSP Clients to Servers.
*/
- String AUTHENTICATE = "giraph.authenticate";
- /** Default is not to do authenticate and authorization with Netty. */
- boolean DEFAULT_AUTHENTICATE = false;
+ BooleanConfOption AUTHENTICATE =
+ new BooleanConfOption("giraph.authenticate", false);
/** Use unsafe serialization? */
- String USE_UNSAFE_SERIALIZATION = "giraph.useUnsafeSerialization";
- /**
- * Use unsafe serialization default is true (use it if you can,
- * its much faster)!
- */
- boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+ BooleanConfOption USE_UNSAFE_SERIALIZATION =
+ new BooleanConfOption("giraph.useUnsafeSerialization", true);
/**
* Maximum number of attempts a master/worker will retry before killing
* the job. This directly maps to the number of map task attempts in
* Hadoop.
*/
- String MAX_TASK_ATTEMPTS = "mapred.map.max.attempts";
+ IntConfOption MAX_TASK_ATTEMPTS =
+ new IntConfOption("mapred.map.max.attempts", -1);
/** Interface to use for hostname resolution */
- String DNS_INTERFACE = "giraph.dns.interface";
+ StrConfOption DNS_INTERFACE =
+ new StrConfOption("giraph.dns.interface", "default");
/** Server for hostname resolution */
- String DNS_NAMESERVER = "giraph.dns.nameserver";
+ StrConfOption DNS_NAMESERVER =
+ new StrConfOption("giraph.dns.nameserver", "default");
/**
* The application will halt after this many supersteps is completed. For
* instance, if it is set to 3, the application will run at most 0, 1,
* and 2 supersteps and then go into the shutdown superstep.
*/
- String MAX_NUMBER_OF_SUPERSTEPS = "giraph.maxNumberOfSupersteps";
- /** By default, the number of supersteps is not limited */
- int MAX_NUMBER_OF_SUPERSTEPS_DEFAULT = -1;
+ IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
+ new IntConfOption("giraph.maxNumberOfSupersteps", 1);
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index de85ab6..0af8b97 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
+import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
+
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
@@ -87,8 +89,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
public ImmutableClassesGiraphConfiguration(Configuration conf) {
super(conf);
classes = new GiraphClasses(conf);
- useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
- USE_UNSAFE_SERIALIZATION_DEFAULT);
+ useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
try {
vertexValueFactory = (VertexValueFactory<V>)
classes.getVertexValueFactoryClass().newInstance();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
new file mode 100644
index 0000000..de75e9d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Integer configuration option
+ */
+public class IntConfOption extends AbstractConfOption {
+ /** Default value */
+ private final int defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public IntConfOption(String key, int defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public IntConfOption(String key, long defaultValue) {
+ super(key);
+ this.defaultValue = (int) defaultValue;
+ AllOptions.add(this);
+ }
+
+ public int getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Integer.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.INTEGER;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or default value if not set
+ */
+ public int get(Configuration conf) {
+ return conf.getInt(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, int value) {
+ conf.setInt(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, int value) {
+ if (conf.get(getKey()) == null) {
+ conf.setInt(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
new file mode 100644
index 0000000..0cbc164
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Long configuration option
+ */
+public class LongConfOption extends AbstractConfOption {
+ /** Default value */
+ private long defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public LongConfOption(String key, long defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public long getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Long.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.LONG;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value set for key, or defaultValue
+ */
+ public long get(Configuration conf) {
+ return conf.getLong(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, long value) {
+ conf.setLong(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, long value) {
+ if (conf.get(getKey()) == null) {
+ conf.setLong(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
new file mode 100644
index 0000000..83a583d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * String Configuration option
+ */
+public class StrConfOption extends AbstractConfOption {
+ /** Default value */
+ private final String defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public StrConfOption(String key, String defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return defaultValue;
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.STRING;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or defaultValue
+ */
+ public String get(Configuration conf) {
+ return conf.get(getKey(), defaultValue);
+ }
+
+ /**
+ * Lookup value with user defined defaultValue
+ * @param conf Configuration
+ * @param defaultVal default value to use
+ * @return value for key, or defaultVal passed in
+ */
+ public String getWithDefault(Configuration conf, String defaultVal) {
+ return conf.get(getKey(), defaultVal);
+ }
+
+ /**
+ * Get array of values for key
+ * @param conf Configuration
+ * @return array of values for key
+ */
+ public String[] getArray(Configuration conf) {
+ return conf.getStrings(getKey(), defaultValue);
+ }
+
+ /**
+ * Get list of values for key
+ * @param conf Configuration
+ * @return list of values for key
+ */
+ public List<String> getList(Configuration conf) {
+ return Lists.newArrayList(getArray(conf));
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, String value) {
+ conf.set(getKey(), value);
+ }
+
+ /**
+ * Set value if not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, String value) {
+ conf.setIfUnset(getKey(), value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e74c59a..57f7dff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,8 +18,6 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -57,6 +55,9 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
@@ -72,6 +73,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+
/**
* The Giraph-specific business logic for a single BSP
* compute node in whatever underlying type of cluster
@@ -440,18 +446,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
Type vertexValueType = classList.get(1);
Type edgeValueType = classList.get(2);
Type messageValueType = classList.get(3);
- conf.setClass(GiraphConstants.VERTEX_ID_CLASS,
- (Class<?>) vertexIndexType,
- WritableComparable.class);
- conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType,
- Writable.class);
- conf.setClass(GiraphConstants.EDGE_VALUE_CLASS,
- (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType,
- Writable.class);
+ VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
+ VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
+ EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
+ MESSAGE_VALUE_CLASS.set(conf, (Class<Writable>) messageValueType);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 1e05773..f55cf18 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -21,7 +21,6 @@ package org.apache.giraph.job;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
@@ -40,6 +39,9 @@ import org.apache.log4j.Logger;
import java.lang.reflect.Type;
import java.util.List;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+
/**
* GiraphConfigurationValidator attempts to verify the consistency of
* user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -138,34 +140,36 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
conf.getMinPercentResponded() > 100.0f) {
throw new IllegalArgumentException(
"checkConfiguration: Invalid " + conf.getMinPercentResponded() +
- " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
+ " for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
}
if (conf.getMinWorkers() < 0) {
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
if (conf.getVertexClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: Null" +
- GiraphConstants.VERTEX_CLASS);
+ throw new IllegalArgumentException("checkConfiguration: Null " +
+ GiraphConstants.VERTEX_CLASS.getKey());
}
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
throw new IllegalArgumentException("checkConfiguration: One of " +
- GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
- GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
+ GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
+ " must be non-null");
}
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
- GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
- DefaultVertexResolver.class.getCanonicalName());
+ VERTEX_RESOLVER_CLASS.getKey() +
+ ", defaulting to " +
+ VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
}
}
if (conf.getVertexEdgesClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
- GiraphConstants.VERTEX_EDGES_CLASS + ", defaulting to " +
- ByteArrayEdges.class.getCanonicalName());
+ VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
+ VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index da85d1c..6849ca3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -214,7 +214,7 @@ public class GiraphJob {
if (LOG.isInfoEnabled()) {
LOG.info("run: Since checkpointing is disabled (default), " +
"do not allow any task retries (setting " +
- GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
+ GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 0, " +
"old value = " + oldMaxTaskAttempts + ")");
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 6c979d6..404e47e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -104,6 +104,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
+
/**
* ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
*
@@ -201,13 +206,10 @@ public class BspServiceMaster<I extends WritableComparable,
maxWorkers = conf.getMaxWorkers();
minWorkers = conf.getMinWorkers();
maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
- minPercentResponded = conf.getFloat(
- GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
+ minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
eventWaitMsecs = conf.getEventWaitMsecs();
maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
- partitionLongTailMinPrint = conf.getInt(
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT,
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+ partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
masterGraphPartitioner =
getGraphPartitionerFactory().createMasterGraphPartitioner();
if (conf.isJMapHistogramDumpEnabled()) {
@@ -316,11 +318,8 @@ public class BspServiceMaster<I extends WritableComparable,
logPrefix + ": Got InterruptedException", e);
}
float samplePercent =
- getConfiguration().getFloat(
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent !=
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+ INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
+ if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
int lastIndex = (int) (samplePercent * splits.size() / 100f);
List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
LOG.warn(logPrefix + ": Using sampling - Processing only " +
@@ -579,6 +578,7 @@ public class BspServiceMaster<I extends WritableComparable,
private int createInputSplits(GiraphInputFormat inputFormat,
InputSplitPaths inputSplitPaths,
String inputSplitType) {
+ ImmutableClassesGiraphConfiguration conf = getConfiguration();
String logPrefix = "create" + inputSplitType + "InputSplits";
// Only the 'master' should be doing this. Wait until the number of
// processes that have reported health exceeds the minimum percentage.
@@ -612,7 +612,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Create at least as many splits as the total number of input threads.
int minSplitCountHint = healthyWorkerInfoList.size() *
- getConfiguration().getNumInputSplitsThreads();
+ conf.getNumInputSplitsThreads();
// Note that the input splits may only be a sample if
// INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
@@ -635,8 +635,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
// Write input splits to zookeeper in parallel
- int inputSplitThreadCount = getConfiguration().getInt(
- INPUT_SPLIT_THREAD_COUNT,
+ int inputSplitThreadCount = conf.getInt(INPUT_SPLIT_THREAD_COUNT,
DEFAULT_INPUT_SPLIT_THREAD_COUNT);
if (LOG.isInfoEnabled()) {
LOG.info(logPrefix + ": Starting to write input split data " +
@@ -644,9 +643,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
ExecutorService taskExecutor =
Executors.newFixedThreadPool(inputSplitThreadCount);
- boolean writeLocations = getConfiguration().getBoolean(
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+ boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
@@ -1356,9 +1353,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private void cleanUpOldSuperstep(long removeableSuperstep) throws
InterruptedException {
- if (!(getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+ if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
(removeableSuperstep >= 0)) {
String oldSuperstepPath =
getSuperstepPath(getApplicationAttempt()) + "/" +
@@ -1523,7 +1518,7 @@ public class BspServiceMaster<I extends WritableComparable,
// If we have completed the maximum number of supersteps, stop
// the computation
if (maxNumberOfSupersteps !=
- GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS_DEFAULT &&
+ GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
(getSuperstep() == maxNumberOfSupersteps - 1)) {
if (LOG.isInfoEnabled()) {
LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
@@ -1661,9 +1656,7 @@ public class BspServiceMaster<I extends WritableComparable,
// provided (not dynamically started) and we don't want to keep the data
try {
if (getConfiguration().getZookeeperList() != null &&
- !getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+ KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanupZooKeeper: Removing the following path " +
"and all children - " + basePath + " from ZooKeeper list " +
@@ -1713,6 +1706,8 @@ public class BspServiceMaster<I extends WritableComparable,
@Override
public void cleanup() throws IOException {
+ ImmutableClassesGiraphConfiguration conf = getConfiguration();
+
// All master processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
// for workers and masters, the master will clean up the ZooKeeper
@@ -1744,9 +1739,7 @@ public class BspServiceMaster<I extends WritableComparable,
if (isMaster) {
cleanUpZooKeeper();
// If desired, cleanup the checkpoint directory
- if (getConfiguration().getBoolean(
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+ if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
boolean success =
getFs().delete(new Path(checkpointBasePath), true);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 40c6b74..ba2f8eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -19,11 +19,10 @@
package org.apache.giraph.master;
import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.bsp.BspService;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -34,6 +33,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
+
/**
* Master thread that will coordinate the activities of the tasks. It runs
* on all task processes, however, will only execute its algorithm if it knows
@@ -76,9 +77,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
this.bspServiceMaster = bspServiceMaster;
this.context = context;
GiraphTimers.init(context);
- superstepCounterOn = context.getConfiguration().getBoolean(
- GiraphConstants.USE_SUPERSTEP_COUNTERS,
- GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
+ superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 6bc9591..3525302 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,12 +18,6 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
@@ -32,6 +26,12 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -55,6 +55,9 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
+
/**
* Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
* Thread-safe, but expects the caller to synchronized between deletes, adds,
@@ -128,14 +131,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
this.conf = conf;
this.context = context;
// We must be able to hold at least one partition in memory
- maxInMemoryPartitions = Math.max(1,
- conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
- GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+ maxInMemoryPartitions = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1);
// Take advantage of multiple disks
- String[] userPaths = conf.getStrings(
- GiraphConstants.PARTITIONS_DIRECTORY,
- GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+ String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
basePaths = new String[userPaths.length];
int i = 0;
for (String path : userPaths) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index c83ca45..fc75006 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -18,14 +18,15 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -34,6 +35,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
+
/**
* Helper class for {@link Partition} related operations.
*/
@@ -166,13 +169,10 @@ public class PartitionUtils {
"computePartitionCount: No available workers");
}
- int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT,
- GiraphConstants.DEFAULT_USER_PARTITION_COUNT);
+ int userPartitionCount = USER_PARTITION_COUNT.get(conf);
int partitionCount;
- if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) {
- float multiplier = conf.getFloat(
- GiraphConstants.PARTITION_COUNT_MULTIPLIER,
- GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER);
+ if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
+ float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
partitionCount =
Math.max((int) (multiplier * availableWorkerInfos.size() *
availableWorkerInfos.size()),
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index ae8556f..23e0f05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,7 +18,6 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -33,6 +32,8 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+
/**
* A simple map-based container that stores vertices. Vertex ids will map to
* exactly one partition.
@@ -57,8 +58,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void initialize(int partitionId, Progressable progressable) {
super.initialize(partitionId, progressable);
- if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
@@ -114,8 +114,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
- if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 2bba672..4b03127 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -151,15 +151,15 @@ public class InternalVertexRunner {
}
conf.setWorkerConfiguration(1, 1, 100.0f);
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
- conf.setBoolean(GiraphConstants.LOCAL_TEST_MODE, true);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+ GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
String.valueOf(LOCAL_ZOOKEEPER_PORT));
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
- conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkMgrDir.toString());
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
for (Map.Entry<String, String> param : params.entrySet()) {
conf.set(param.getKey(), param.getValue());
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index 463510f..aedee6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -98,10 +98,8 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
@Override
public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- sleepMillis = configuration.getInt(GiraphConstants.JMAP_SLEEP_MILLIS,
- GiraphConstants.JMAP_SLEEP_MILLIS_DEFAULT);
- linesToPrint = configuration.getInt(GiraphConstants.JMAP_PRINT_LINES,
- GiraphConstants.JMAP_PRINT_LINES_DEFAULT);
+ sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
+ linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
index 7589a09..19f6be5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
@@ -18,20 +18,22 @@
package org.apache.giraph.zk;
-import static java.lang.System.out;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.lang.System.out;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
/**
* A Utility class to be used by Giraph admins to occasionally clean up the
@@ -74,15 +76,13 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
*/
@Override
public int run(String[] args) {
- final int zkPort = getConf().getInt(
- GiraphConfiguration.ZOOKEEPER_SERVER_PORT,
- GiraphConfiguration.ZOOKEEPER_SERVER_PORT_DEFAULT);
+ final int zkPort = ZOOKEEPER_SERVER_PORT.get(getConf());
final String zkBasePath = getConf().get(
- GiraphConfiguration.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
+ GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
final String[] zkServerList;
try {
zkServerList = getConf()
- .get(GiraphConfiguration.ZOOKEEPER_LIST).split(",");
+ .get(GiraphConstants.ZOOKEEPER_LIST).split(",");
} catch (NullPointerException npe) {
throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
"of ZooKeeper servers to clean.");
@@ -94,9 +94,9 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
try {
ZooKeeperExt zooKeeper = new ZooKeeperExt(
formatZkServerList(zkServerList, zkPort),
- GiraphConfiguration.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT,
- GiraphConfiguration.ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT,
- GiraphConfiguration.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT,
+ GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
+ GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
+ GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
this);
doZooKeeperCleanup(zooKeeper, zkBasePath);
return 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index add57fc..82a7fc1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
/**
@@ -141,7 +142,7 @@ public class ZooKeeperManager {
taskPartition = conf.getTaskPartition();
jobId = conf.get("mapred.job.id", "Unknown Job");
baseDirectory =
- new Path(conf.get(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
getFinalZooKeeperPath()));
taskDirectory = new Path(baseDirectory,
"_task");
@@ -150,25 +151,19 @@ public class ZooKeeperManager {
myClosedPath = new Path(taskDirectory,
Integer.toString(taskPartition) +
COMPUTATION_DONE_SUFFIX);
- pollMsecs = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS,
- GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
- serverCount = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVER_COUNT,
- GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
+ serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
String jobLocalDir = conf.get("job.local.dir");
if (jobLocalDir != null) { // for non-local jobs
zkDirDefault = jobLocalDir +
"/_bspZooKeeper";
} else {
zkDirDefault = System.getProperty("user.dir") + "/" +
- GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT;
+ ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
}
zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
configFilePath = zkDir + "/zoo.cfg";
- zkBasePort = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVER_PORT,
- GiraphConstants.ZOOKEEPER_SERVER_PORT_DEFAULT);
+ zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
myHostname = conf.getLocalHostname();
fs = FileSystem.get(conf);
@@ -180,7 +175,7 @@ public class ZooKeeperManager {
* @return directory path with job id
*/
private String getFinalZooKeeperPath() {
- return GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
+ return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
}
/**
@@ -305,7 +300,7 @@ public class ZooKeeperManager {
"for base directory " + baseDirectory + ". If there is an " +
"issue with this directory, please set an accesible " +
"base directory with the Hadoop configuration option " +
- GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY);
+ ZOOKEEPER_MANAGER_DIRECTORY.getKey());
}
Path myCandidacyPath = new Path(
@@ -627,9 +622,7 @@ public class ZooKeeperManager {
"onlineZooKeeperServers: java.home is not set!");
}
commandList.add(javaHome + "/bin/java");
- String zkJavaOptsString =
- conf.get(GiraphConstants.ZOOKEEPER_JAVA_OPTS,
- GiraphConstants.ZOOKEEPER_JAVA_OPTS_DEFAULT);
+ String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf);
String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
if (zkJavaOptsArray != null) {
commandList.addAll(Arrays.asList(zkJavaOptsArray));
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index a6eef20..4c74d3f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -98,11 +98,11 @@ public class BspCase implements Watcher {
"location " + getJarLocation() + " for " + getName());
conf.setWorkerConfiguration(1, 1, 100.0f);
// Single node testing
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
}
conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
conf.setEventWaitMsecs(3 * 1000);
- conf.setInt(GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+ GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
if (getZooKeeperList() != null) {
conf.setZooKeeperConfiguration(getZooKeeperList());
}
@@ -120,9 +120,9 @@ public class BspCase implements Watcher {
FileUtils.deletePath(conf, checkPointDir);
conf.set(GiraphConstants.ZOOKEEPER_DIR, zookeeperDir.toString());
- conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkManagerDir.toString());
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkPointDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkPointDir.toString());
return conf;
}