You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/28 01:02:30 UTC
[3/3] git commit: GIRAPH-587: Refactor configuration options (nitay)
GIRAPH-587: Refactor configuration options (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/01c527e2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/01c527e2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/01c527e2
Branch: refs/heads/trunk
Commit: 01c527e225336d68a30e447889b37b6187f6286c
Parents: 460198a
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Mar 27 20:01:56 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Mar 27 20:02:15 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
checkstyle.xml | 1 +
.../apache/giraph/benchmark/PageRankBenchmark.java | 2 +-
.../giraph/benchmark/ShortestPathsBenchmark.java | 2 +-
.../java/org/apache/giraph/bsp/BspInputFormat.java | 8 +-
.../java/org/apache/giraph/bsp/BspService.java | 8 +-
.../java/org/apache/giraph/comm/SendEdgeCache.java | 12 +-
.../org/apache/giraph/comm/SendMessageCache.java | 12 +-
.../java/org/apache/giraph/comm/ServerData.java | 17 +-
.../comm/messages/SequentialFileMessageStore.java | 10 +-
.../org/apache/giraph/comm/netty/NettyClient.java | 54 +-
.../org/apache/giraph/comm/netty/NettyServer.java | 38 +-
.../netty/NettyWorkerClientRequestProcessor.java | 42 +-
.../giraph/comm/netty/NettyWorkerServer.java | 12 +-
.../giraph/comm/netty/handler/RequestEncoder.java | 10 +-
.../comm/netty/handler/RequestServerHandler.java | 7 +-
.../comm/netty/handler/ResponseClientHandler.java | 7 +-
.../comm/netty/handler/SaslServerHandler.java | 7 +-
.../org/apache/giraph/conf/AbstractConfOption.java | 92 +++
.../java/org/apache/giraph/conf/AllOptions.java | 82 ++
.../org/apache/giraph/conf/BooleanConfOption.java | 96 +++
.../org/apache/giraph/conf/ClassConfOption.java | 182 +++++
.../org/apache/giraph/conf/ConfOptionType.java | 36 +
.../org/apache/giraph/conf/FloatConfOption.java | 80 ++
.../java/org/apache/giraph/conf/GiraphClasses.java | 46 +-
.../apache/giraph/conf/GiraphConfiguration.java | 211 ++----
.../org/apache/giraph/conf/GiraphConstants.java | 623 +++++++--------
.../conf/ImmutableClassesGiraphConfiguration.java | 5 +-
.../java/org/apache/giraph/conf/IntConfOption.java | 91 +++
.../org/apache/giraph/conf/LongConfOption.java | 80 ++
.../java/org/apache/giraph/conf/StrConfOption.java | 110 +++
.../org/apache/giraph/graph/GraphTaskManager.java | 26 +-
.../giraph/job/GiraphConfigurationValidator.java | 24 +-
.../main/java/org/apache/giraph/job/GiraphJob.java | 2 +-
.../org/apache/giraph/master/BspServiceMaster.java | 45 +-
.../org/apache/giraph/master/MasterThread.java | 9 +-
.../giraph/partition/DiskBackedPartitionStore.java | 23 +-
.../apache/giraph/partition/PartitionUtils.java | 16 +-
.../apache/giraph/partition/SimplePartition.java | 9 +-
.../apache/giraph/utils/InternalVertexRunner.java | 8 +-
.../org/apache/giraph/utils/JMapHistoDumper.java | 6 +-
.../org/apache/giraph/zk/GiraphZooKeeperAdmin.java | 32 +-
.../org/apache/giraph/zk/ZooKeeperManager.java | 25 +-
.../src/test/java/org/apache/giraph/BspCase.java | 8 +-
.../org/apache/giraph/comm/RequestFailureTest.java | 17 +-
.../java/org/apache/giraph/comm/RequestTest.java | 12 +-
.../org/apache/giraph/comm/SaslConnectionTest.java | 7 +-
.../giraph/conf/TestGiraphConfiguration.java | 7 +-
.../org/apache/giraph/conf/TestObjectCreation.java | 16 +-
.../apache/giraph/master/TestMasterObserver.java | 9 +-
.../giraph/partition/TestPartitionStores.java | 11 +-
.../java/org/apache/giraph/TestAutoCheckpoint.java | 8 +-
.../test/java/org/apache/giraph/TestBspBasic.java | 5 +-
.../org/apache/giraph/TestManualCheckpoint.java | 14 +-
.../org/apache/giraph/TestPartitionContext.java | 3 +-
.../aggregators/TestAggregatorsHandling.java | 28 +-
.../org/apache/giraph/examples/TestPageRank.java | 3 +-
.../examples/TryMultiIpcBindingPortsTest.java | 3 +-
.../org/apache/giraph/vertex/TestVertexTypes.java | 124 +--
59 files changed, 1535 insertions(+), 950 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 530d9ad..051c9a7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-587: Refactor configuration options (nitay)
+
GIRAPH-581: More flexible Hive output (majakabiljo)
GIRAPH-579: Make it possible to use different out-edges data structures
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle.xml b/checkstyle.xml
index 3d8a6d4..370c120 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -221,6 +221,7 @@
<!-- Lines cannot exceed 80 chars -->
<module name="LineLength">
<property name="max" value="80"/>
+ <property name="ignorePattern" value="^import"/>
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 2902fa9..b5d7e1e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -209,7 +209,7 @@ public class PageRankBenchmark implements Tool {
}
LOG.info("Using edges class " +
- configuration.get(GiraphConstants.VERTEX_EDGES_CLASS));
+ GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
if (!cmd.hasOption('t') ||
(Integer.parseInt(cmd.getOptionValue('t')) == 1)) {
configuration.setVertexCombinerClass(DoubleSumCombiner.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 1753f4f..c3c714e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -115,7 +115,7 @@ public class ShortestPathsBenchmark implements Tool {
job.getConfiguration().setVertexEdgesClass(HashMapEdges.class);
}
LOG.info("Using class " +
- job.getConfiguration().get(GiraphConstants.VERTEX_CLASS));
+ GiraphConstants.VERTEX_CLASS.get(job.getConfiguration()));
job.getConfiguration().setVertexInputFormatClass(
PseudoRandomVertexInputFormat.class);
if (!cmd.hasOption("nc")) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index bce84b1..cc53271 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -50,14 +50,10 @@ public class BspInputFormat extends InputFormat<Text, Text> {
*/
public static int getMaxTasks(Configuration conf) {
int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
- boolean splitMasterWorker =
- conf.getBoolean(GiraphConstants.SPLIT_MASTER_WORKER,
- GiraphConstants.SPLIT_MASTER_WORKER_DEFAULT);
+ boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
int maxTasks = maxWorkers;
if (splitMasterWorker) {
- int zkServers =
- conf.getInt(GiraphConstants.ZOOKEEPER_SERVER_COUNT,
- GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
maxTasks += zkServers;
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 969e2a5..187d111 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -52,6 +52,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+
/**
* Zookeeper-based implementation of {@link CentralizedService}.
*
@@ -311,9 +313,9 @@ public abstract class BspService<I extends WritableComparable,
EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
- checkpointBasePath = getConfiguration().get(
- GiraphConstants.CHECKPOINT_DIRECTORY,
- GiraphConstants.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
+ checkpointBasePath =
+ CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
+ CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index 679cf6f..fbc911f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
@@ -28,6 +27,9 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
+
/**
* Aggregates the edges to be sent to workers so they can be sent
* in bulk. Not thread-safe.
@@ -45,12 +47,8 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
*/
public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
- super(conf,
- serviceWorker,
- conf.getInt(GiraphConstants.MAX_EDGE_REQUEST_SIZE,
- GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT),
- conf.getFloat(GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE,
- GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT));
+ super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
+ ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 07dc380..7d2a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
@@ -27,6 +26,9 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+
/**
* Aggregates the messages to be sent to workers so they can be sent
* in bulk. Not thread-safe.
@@ -44,12 +46,8 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
*/
public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
- super(conf,
- serviceWorker,
- conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
- GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT),
- conf.getFloat(GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE,
- GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT));
+ super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
+ ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 70dc156..743a6f8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -84,13 +84,13 @@ public class ServerData<I extends WritableComparable,
* Constructor.
*
* @param service Service worker
- * @param configuration Configuration
+ * @param conf Configuration
* @param messageStoreFactory Factory for message stores
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
@@ -98,17 +98,16 @@ public class ServerData<I extends WritableComparable,
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
- if (configuration.getBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH,
- GiraphConstants.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+ if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
partitionStore =
- new DiskBackedPartitionStore<I, V, E, M>(configuration, context);
+ new DiskBackedPartitionStore<I, V, E, M>(conf, context);
} else {
partitionStore =
- new SimplePartitionStore<I, V, E, M>(configuration, context);
+ new SimplePartitionStore<I, V, E, M>(conf, context);
}
- edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
- ownerAggregatorData = new OwnerAggregatorServerData(context, configuration);
- allAggregatorData = new AllAggregatorServerData(context, configuration);
+ edgeStore = new EdgeStore<I, V, E, M>(service, conf, context);
+ ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
+ allAggregatorData = new AllAggregatorServerData(context, conf);
}
public EdgeStore<I, V, E, M> getEdgeStore() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index bdc5435..3fe4430 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+
/**
* Used for writing and reading collection of messages to the disk. {@link
* #addMessages(MessageStore<I, M>)} should be called only once with
@@ -377,9 +379,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
int taskId = config.getTaskPartition();
- List<String> userPaths = Lists.newArrayList(config.getStrings(
- GiraphConstants.MESSAGES_DIRECTORY,
- GiraphConstants.MESSAGES_DIRECTORY_DEFAULT));
+ List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
Collections.shuffle(userPaths);
directories = new String[userPaths.size()];
int i = 0;
@@ -389,9 +389,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
directories[i++] = directory;
new File(directory).mkdirs();
}
- this.bufferSize = config.getInt(
- GiraphConstants.MESSAGES_BUFFER_SIZE,
- GiraphConstants.MESSAGES_BUFFER_SIZE_DEFAULT);
+ this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
storeCounter = new AtomicInteger();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index af76410..30c32fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -70,6 +70,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
+import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
+import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
import static org.jboss.netty.channel.Channels.pipeline;
/**
@@ -175,15 +184,9 @@ public class NettyClient {
TaskInfo myTaskInfo) {
this.context = context;
this.myTaskInfo = myTaskInfo;
- this.channelsPerServer = conf.getInt(
- GiraphConstants.CHANNELS_PER_SERVER,
- GiraphConstants.DEFAULT_CHANNELS_PER_SERVER);
- sendBufferSize = conf.getInt(
- GiraphConstants.CLIENT_SEND_BUFFER_SIZE,
- GiraphConstants.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(
- GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE,
- GiraphConstants.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+ this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
+ sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
+ receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
limitNumberOfOpenRequests = conf.getBoolean(
LIMIT_NUMBER_OF_OPEN_REQUESTS,
@@ -200,39 +203,24 @@ public class NettyClient {
maxNumberOfOpenRequests = -1;
}
- maxRequestMilliseconds = conf.getInt(
- GiraphConstants.MAX_REQUEST_MILLISECONDS,
- GiraphConstants.MAX_REQUEST_MILLISECONDS_DEFAULT);
+ maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
- maxConnectionFailures = conf.getInt(
- GiraphConstants.NETTY_MAX_CONNECTION_FAILURES,
- GiraphConstants.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+ maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
- waitingRequestMsecs = conf.getInt(
- GiraphConstants.WAITING_REQUEST_MSECS,
- GiraphConstants.WAITING_REQUEST_MSECS_DEFAULT);
+ waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
- maxPoolSize = conf.getInt(
- GiraphConstants.NETTY_CLIENT_THREADS,
- GiraphConstants.NETTY_CLIENT_THREADS_DEFAULT);
+ maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
- maxResolveAddressAttempts = conf.getInt(
- GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS,
- GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+ maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
clientRequestIdRequestInfoMap =
new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
- handlerBeforeExecutionHandler = conf.get(
- GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER,
- GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT);
- boolean useExecutionHandler = conf.getBoolean(
- GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER,
- GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT);
+ handlerBeforeExecutionHandler =
+ NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
+ boolean useExecutionHandler = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
if (useExecutionHandler) {
- int executionThreads = conf.getInt(
- GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS,
- GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
+ int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
executionHandler = new ExecutionHandler(
new MemoryAwareThreadPoolExecutor(
executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index f31dd4a..0bfc2d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
import static org.jboss.netty.channel.Channels.pipeline;
/**
@@ -140,12 +141,8 @@ public class NettyServer {
this.saslServerHandlerFactory = new SaslServerHandler.Factory();
/*end[HADOOP_NON_SECURE]*/
this.myTaskInfo = myTaskInfo;
- sendBufferSize = conf.getInt(
- GiraphConstants.SERVER_SEND_BUFFER_SIZE,
- GiraphConstants.DEFAULT_SERVER_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(
- GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE,
- GiraphConstants.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+ sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
+ receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
workerRequestReservedMap = new WorkerRequestReservedMap(conf);
@@ -162,25 +159,21 @@ public class NettyServer {
throw new IllegalStateException("NettyServer: unable to get hostname");
}
- maxPoolSize = conf.getInt(
- GiraphConstants.NETTY_SERVER_THREADS,
- GiraphConstants.NETTY_SERVER_THREADS_DEFAULT);
+ maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
- tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG,
+ tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
conf.getInt(GiraphConstants.MAX_WORKERS,
- GiraphConstants.TCP_BACKLOG_DEFAULT));
+ GiraphConstants.TCP_BACKLOG.getDefaultValue()));
channelFactory = new NioServerSocketChannelFactory(
bossExecutorService,
workerExecutorService,
maxPoolSize);
- handlerBeforeExecutionHandler = conf.get(
- GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER,
- GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT);
- boolean useExecutionHandler = conf.getBoolean(
- GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER,
- GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ handlerBeforeExecutionHandler =
+ GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
+ boolean useExecutionHandler =
+ GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
if (useExecutionHandler) {
int executionThreads = conf.getNettyServerExecutionThreads();
executionHandler = new ExecutionHandler(
@@ -304,16 +297,11 @@ public class NettyServer {
int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
- int bindPort = conf.getInt(GiraphConstants.IPC_INITIAL_PORT,
- GiraphConstants.IPC_INITIAL_PORT_DEFAULT) +
- taskId;
+ int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
int bindAttempts = 0;
- final int maxIpcPortBindAttempts =
- conf.getInt(GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS,
- GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
final boolean failFirstPortBindingAttempt =
- conf.getBoolean(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT,
- GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT);
+ GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
// Simple handling of port collisions on the same machine while
// preserving debugability from the port number alone.
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index e58030e..db4ff5d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,9 +17,6 @@
*/
package org.apache.giraph.comm.netty;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.SendEdgeCache;
@@ -37,9 +34,9 @@ import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
@@ -49,16 +46,23 @@ import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
+
import java.io.IOException;
import java.util.Map;
+import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
+
/**
* Aggregate requests and sends them to the thread-safe NettyClient. This
* class is not thread-safe and expected to be used and then thrown away after
@@ -112,30 +116,22 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
* Constructor.
*
* @param context Context
- * @param configuration Configuration
+ * @param conf Configuration
* @param serviceWorker Service worker
*/
public NettyWorkerClientRequestProcessor(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
CentralizedServiceWorker<I, V, E, M> serviceWorker) {
this.workerClient = serviceWorker.getWorkerClient();
- this.configuration = configuration;
-
- sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
- configuration);
- sendMessageCache =
- new SendMessageCache<I, M>(configuration, serviceWorker);
- sendEdgeCache = new SendEdgeCache<I, E>(configuration, serviceWorker);
- maxMessagesSizePerWorker = configuration.getInt(
- GiraphConstants.MAX_MSG_REQUEST_SIZE,
- GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
- maxEdgesSizePerWorker = configuration.getInt(
- GiraphConstants.MAX_EDGE_REQUEST_SIZE,
- GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT);
- maxMutationsPerPartition = configuration.getInt(
- GiraphConstants.MAX_MUTATIONS_PER_REQUEST,
- GiraphConstants.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+ this.configuration = conf;
+
+ sendPartitionCache = new SendPartitionCache<I, V, E, M>(context, conf);
+ sendMessageCache = new SendMessageCache<I, M>(conf, serviceWorker);
+ sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
+ maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
+ maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
+ maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf);
this.serviceWorker = serviceWorker;
this.serverData = serviceWorker.getServerData();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 1fb0580..ed0861e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -31,7 +31,6 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
import org.apache.giraph.comm.messages.SequentialFileMessageStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
@@ -51,6 +50,9 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map.Entry;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+
/**
* Netty worker server that implement {@link WorkerServer} and contains
* the actual {@link ServerData}.
@@ -107,9 +109,7 @@ public class NettyWorkerServer<I extends WritableComparable,
*/
private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
createMessageStoreFactory() {
- boolean useOutOfCoreMessaging = conf.getBoolean(
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+ boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
if (!useOutOfCoreMessaging) {
if (conf.useCombiner()) {
if (LOG.isInfoEnabled()) {
@@ -126,9 +126,7 @@ public class NettyWorkerServer<I extends WritableComparable,
return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
}
} else {
- int maxMessagesInMemory = conf.getInt(
- GiraphConstants.MAX_MESSAGES_IN_MEMORY,
- GiraphConstants.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+ int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
if (LOG.isInfoEnabled()) {
LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
"maxMessagesInMemory = " + maxMessagesInMemory);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 4e739cb..83b408e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -55,12 +55,10 @@ public class RequestEncoder extends OneToOneEncoder {
* @param conf Giraph configuration
*/
public RequestEncoder(GiraphConfiguration conf) {
- bufferStartingSize = conf.getInt(
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT);
- useDirectBuffers = conf.getBoolean(
- GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS,
- GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT);
+ bufferStartingSize =
+ GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
+ useDirectBuffers =
+ GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index bbf31c7..a02039e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
@@ -34,6 +33,8 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
+
/**
* Generic handler of requests.
*
@@ -71,9 +72,7 @@ public abstract class RequestServerHandler<R> extends
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo) {
this.workerRequestReservedMap = workerRequestReservedMap;
- closeFirstRequest = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 1803be4..9f3f034 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm.netty.handler;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -32,6 +31,8 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
+
/**
* Generic handler of responses.
*/
@@ -59,9 +60,7 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler {
workerIdOutstandingRequestMap,
Configuration conf) {
this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
- dropFirstResponse = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
- GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
+ dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
index d06fd09..9644a5f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
@@ -24,7 +24,6 @@ import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.SaslCompleteRequest;
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -45,6 +44,8 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collection;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
+
/**
* Generate SASL response tokens to client SASL tokens, allowing clients to
* authenticate themselves with this server.
@@ -74,9 +75,7 @@ public class SaslServerHandler extends
Configuration conf) throws IOException {
SaslNettyServer.init(conf);
setupSecretManager(conf);
- closeFirstRequest = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
new file mode 100644
index 0000000..d00f7e9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Abstract base class of configuration options
+ */
+public abstract class AbstractConfOption
+ implements Comparable<AbstractConfOption> {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(AbstractConfOption.class);
+
+ /** Key for configuration */
+ private final String key;
+
+ /**
+ * Constructor
+ * @param key configuration key
+ */
+ public AbstractConfOption(String key) {
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ @Override public int compareTo(AbstractConfOption o) {
+ return ComparisonChain.start()
+ .compare(getType(), o.getType())
+ .compare(key, o.key)
+ .result();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AbstractConfOption)) {
+ return false;
+ }
+
+ AbstractConfOption that = (AbstractConfOption) o;
+ return Objects.equal(getType(), that.getType()) &&
+ Objects.equal(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(key);
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(" ").append(key).append(" => ").append(getDefaultValueStr());
+ sb.append(" (").append(getType().toString().toLowerCase()).append(")\n");
+ return sb.toString();
+ }
+
+ /**
+ * Get string representation of default value
+ * @return String
+ */
+ public abstract String getDefaultValueStr();
+
+ /**
+ * Get type this option holds
+ * @return ConfOptionType
+ */
+ public abstract ConfOptionType getType();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
new file mode 100644
index 0000000..cceaaef
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+
+/**
+ * Tracks all of the Giraph options
+ */
+public class AllOptions {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(AllOptions.class);
+
+ /** Configuration options */
+ private static final List<AbstractConfOption> CONF_OPTIONS =
+ Lists.newArrayList();
+
+ /** Don't construct */
+ private AllOptions() { }
+
+ /**
+ * Add an option. Subclasses of {@link AbstractConfOption} should call this
+ * at the end of their constructor.
+ * @param confOption option
+ */
+ public static void add(AbstractConfOption confOption) {
+ CONF_OPTIONS.add(confOption);
+ }
+
+ /**
+ * String representation of all of the options stored
+ * @return string
+ */
+ public static String allOptionsString() {
+ Collections.sort(CONF_OPTIONS);
+ StringBuilder sb = new StringBuilder(CONF_OPTIONS.size() * 30);
+ sb.append("All Options:\n");
+ ConfOptionType lastType = null;
+ for (AbstractConfOption confOption : CONF_OPTIONS) {
+ if (!confOption.getType().equals(lastType)) {
+ sb.append(confOption.getType().toString().toLowerCase()).append(":\n");
+ lastType = confOption.getType();
+ }
+ sb.append(confOption);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Command line utility to dump all Giraph options
+ * @param args cmdline args
+ */
+ public static void main(String[] args) {
+ // This is necessary to trigger the static constants in GiraphConstants to
+ // get loaded. Without it we get no output.
+ VERTEX_CLASS.toString();
+
+ LOG.info(allOptionsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
new file mode 100644
index 0000000..c16ec88
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Boolean configuration option
+ */
+public class BooleanConfOption extends AbstractConfOption {
+ /** Default value */
+ private final boolean defaultValue;
+
+ /**
+ * Constructor
+ * @param key configuration key
+ * @param defaultValue default value
+ */
+ public BooleanConfOption(String key, boolean defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public boolean isDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Boolean.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.BOOLEAN;
+ }
+
+ /**
+ * Lookup value in Configuration
+ * @param conf Configuration
+ * @return value for key in conf, or defaultValue if not present
+ */
+ public boolean get(Configuration conf) {
+ return conf.getBoolean(getKey(), defaultValue);
+ }
+
+ /**
+ * Check if value is true
+ * @param conf Configuration
+ * @return true if value is set and true, false otherwise
+ */
+ public boolean isFalse(Configuration conf) {
+ return !get(conf);
+ }
+
+ /**
+ * Check if value is false
+ * @param conf Configuration
+ * @return true if value is set and true, false otherwise
+ */
+ public boolean isTrue(Configuration conf) {
+ return get(conf);
+ }
+
+ /**
+ * Set value in configuration for this key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, boolean value) {
+ conf.setBoolean(getKey(), value);
+ }
+
+ /**
+ * Set value in configuration if it hasn't been set already
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, boolean value) {
+ conf.setBooleanIfUnset(getKey(), value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
new file mode 100644
index 0000000..d67e0a5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Class configuration option
+ * @param <C> interface of class
+ */
+public class ClassConfOption<C> extends AbstractConfOption {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(ClassConfOption.class);
+
+ /** Base interface for class */
+ private final Class<C> interfaceClass;
+ /** Default class if not set in configuration */
+ private final Class<? extends C> defaultClass;
+
+ /**
+ * Private constructor
+ * @param key Key
+ * @param defaultClass default class
+ * @param interfaceClass interface class
+ */
+ private ClassConfOption(String key, Class<? extends C> defaultClass,
+ Class<C> interfaceClass) {
+ super(key);
+ this.defaultClass = defaultClass;
+ this.interfaceClass = interfaceClass;
+ AllOptions.add(this);
+ }
+
+ /**
+ * Static create method
+ * @param key key
+ * @param defaultClass default class
+ * @param interfaceClass interface class
+ * @param <T> type of class
+ * @return ClassConfOption
+ */
+ public static <T> ClassConfOption<T> create(String key,
+ Class<? extends T> defaultClass, Class<T> interfaceClass) {
+ return new ClassConfOption<T>(key, defaultClass, interfaceClass);
+ }
+
+ public Class<? extends C> getDefaultClass() {
+ return defaultClass;
+ }
+
+ public Class<C> getInterfaceClass() {
+ return interfaceClass;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return defaultClass == null ? "null" : defaultClass.getSimpleName();
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.CLASS;
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(" ");
+ sb.append(getKey()).append(" => ").append(getDefaultValueStr());
+ sb.append(" [").append(interfaceClass.getSimpleName()).append("] ");
+ sb.append(" (").append(getType().toString().toLowerCase()).append(")\n");
+ return sb.toString();
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return Class set for key, or defaultClass
+ */
+ public Class<? extends C> get(Configuration conf) {
+ return conf.getClass(getKey(), defaultClass, interfaceClass);
+ }
+
+ /**
+ * Lookup array of classes for key
+ * @param conf Configuration
+ * @return array of classes
+ */
+ public Class<? extends C>[] getArray(Configuration conf) {
+ return getClassesOfType(conf, getKey(), interfaceClass);
+ }
+
+ /**
+ * Get classes from a property that all implement a given interface.
+ *
+ * @param conf Configuration
+ * @param name String name of property to fetch.
+ * @param xface interface classes must implement.
+ * @param defaultValue If not found, return this
+ * @param <T> Generic type of interface class
+ * @return array of Classes implementing interface specified.
+ */
+ public static <T> Class<? extends T>[] getClassesOfType(Configuration conf,
+ String name, Class<T> xface, Class<? extends T> ... defaultValue) {
+ Class<?>[] klasses = conf.getClasses(name, defaultValue);
+ for (Class<?> klass : klasses) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " is not assignable from " +
+ xface.getName());
+ }
+ }
+ return (Class<? extends T>[]) klasses;
+ }
+
+ /**
+ * Lookup with user specified default value
+ * @param conf Configuration
+ * @param defaultValue default value
+ * @return Class
+ */
+ public Class<? extends C> getWithDefault(Configuration conf,
+ Class<? extends C> defaultValue) {
+ return conf.getClass(getKey(), defaultValue, interfaceClass);
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param klass Class to set
+ */
+ public void set(Configuration conf, Class<? extends C> klass) {
+ conf.setClass(getKey(), klass, interfaceClass);
+ }
+
+ /**
+ * Add class to list for key
+ * @param conf Configuration
+ * @param klass Class to add
+ */
+ public void add(Configuration conf, Class<? extends C> klass) {
+ addToClasses(conf, getKey(), klass, interfaceClass);
+ }
+
+ /**
+ * Add a class to a property that is a list of classes. If the property does
+ * not exist it will be created.
+ *
+ * @param <T> type of class
+ * @param conf Configuration
+ * @param name String name of property.
+ * @param klass interface of the class being set.
+ * @param xface Class to add to the list.
+ */
+ public static <T> void addToClasses(Configuration conf, String name,
+ Class<? extends T> klass, Class<T> xface) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " does not implement " +
+ xface.getName());
+ }
+ String value;
+ String klasses = conf.get(name);
+ if (klasses == null) {
+ value = klass.getName();
+ } else {
+ value = klasses + "," + klass.getName();
+ }
+ conf.set(name, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
new file mode 100644
index 0000000..8f70d90
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+/**
+ * Type of value for a Configuration option
+ */
+public enum ConfOptionType {
+ /** boolean */
+ BOOLEAN,
+ /** class */
+ CLASS,
+ /** integer */
+ INTEGER,
+ /** float */
+ FLOAT,
+ /** long */
+ LONG,
+ /** string */
+ STRING;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
new file mode 100644
index 0000000..fa21a28
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Float Configuration option
+ */
+public class FloatConfOption extends AbstractConfOption {
+ /** Default value */
+ private final float defaultValue;
+
+ /**
+ * Constructor
+ * @param key Configuration key
+ * @param defaultValue default value
+ */
+ public FloatConfOption(String key, float defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public float getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Float.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.FLOAT;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or defaultValue if not present
+ */
+ public float get(Configuration conf) {
+ return conf.getFloat(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, float value) {
+ conf.setFloat(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, float value) {
+ if (conf.get(getKey()) == null) {
+ conf.setFloat(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index c13f3a2..23dab79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -149,56 +149,42 @@ public class GiraphClasses<I extends WritableComparable,
*/
public void readFromConf(Configuration conf) {
// set pre-validated generic parameter types into Configuration
- vertexClass = (Class<? extends Vertex<I, V, E, M>>)
- conf.getClass(VERTEX_CLASS, null, Vertex.class);
+ vertexClass = (Class<? extends Vertex<I, V, E, M>>) VERTEX_CLASS.get(conf);
List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
vertexClass);
vertexIdClass = (Class<I>) classList.get(0);
vertexValueClass = (Class<V>) classList.get(1);
edgeValueClass = (Class<E>) classList.get(2);
messageValueClass = (Class<M>) classList.get(3);
+
vertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
- conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class,
- VertexEdges.class);
+ VERTEX_EDGES_CLASS.get(conf);
inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
- conf.getClass(INPUT_VERTEX_EDGES_CLASS, vertexEdgesClass,
- VertexEdges.class);
+ INPUT_VERTEX_EDGES_CLASS.getWithDefault(conf, vertexEdgesClass);
vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>)
- conf.getClass(VERTEX_VALUE_FACTORY_CLASS,
- DefaultVertexValueFactory.class, VertexValueFactory.class);
+ VERTEX_VALUE_FACTORY_CLASS.get(conf);
graphPartitionerFactoryClass =
(Class<? extends GraphPartitionerFactory<I, V, E, M>>)
- conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS,
- HashPartitionerFactory.class,
- GraphPartitionerFactory.class);
+ GRAPH_PARTITIONER_FACTORY_CLASS.get(conf);
vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E, M>>)
- conf.getClass(VERTEX_INPUT_FORMAT_CLASS,
- null, VertexInputFormat.class);
+ VERTEX_INPUT_FORMAT_CLASS.get(conf);
vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
- conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
- null, VertexOutputFormat.class);
+ VERTEX_OUTPUT_FORMAT_CLASS.get(conf);
edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
- conf.getClass(EDGE_INPUT_FORMAT_CLASS,
- null, EdgeInputFormat.class);
+ EDGE_INPUT_FORMAT_CLASS.get(conf);
- aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
- TextAggregatorWriter.class, AggregatorWriter.class);
+ aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
combinerClass = (Class<? extends Combiner<I, M>>)
- conf.getClass(VERTEX_COMBINER_CLASS, null, Combiner.class);
+ VERTEX_COMBINER_CLASS.get(conf);
vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
- conf.getClass(VERTEX_RESOLVER_CLASS,
- DefaultVertexResolver.class, VertexResolver.class);
- partitionContextClass = conf.getClass(PARTITION_CONTEXT_CLASS,
- DefaultPartitionContext.class, PartitionContext.class);
- workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
- DefaultWorkerContext.class, WorkerContext.class);
- masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS,
- DefaultMasterCompute.class, MasterCompute.class);
-
+ VERTEX_RESOLVER_CLASS.get(conf);
+ partitionContextClass = PARTITION_CONTEXT_CLASS.get(conf);
+ workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
+ masterComputeClass = MASTER_COMPUTE_CLASS.get(conf);
partitionClass = (Class<? extends Partition<I, V, E, M>>)
- conf.getClass(PARTITION_CLASS, SimplePartition.class);
+ PARTITION_CLASS.get(conf);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ffcae6e..dee8e98 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -21,19 +21,18 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +71,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexClass(
Class<? extends Vertex> vertexClass) {
- setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+ VERTEX_CLASS.set(this, vertexClass);
}
/**
@@ -82,8 +81,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexValueFactoryClass(
Class<? extends VertexValueFactory> vertexValueFactoryClass) {
- setClass(VERTEX_VALUE_FACTORY_CLASS, vertexValueFactoryClass,
- VertexValueFactory.class);
+ VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
}
/**
@@ -93,7 +91,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexEdgesClass(
Class<? extends VertexEdges> vertexEdgesClass) {
- setClass(VERTEX_EDGES_CLASS, vertexEdgesClass, VertexEdges.class);
+ VERTEX_EDGES_CLASS.set(this, vertexEdgesClass);
}
/**
@@ -104,8 +102,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setInputVertexEdgesClass(
Class<? extends VertexEdges> inputVertexEdgesClass) {
- setClass(INPUT_VERTEX_EDGES_CLASS, inputVertexEdgesClass,
- VertexEdges.class);
+ INPUT_VERTEX_EDGES_CLASS.set(this, inputVertexEdgesClass);
}
/**
@@ -115,9 +112,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexInputFormatClass(
Class<? extends VertexInputFormat> vertexInputFormatClass) {
- setClass(VERTEX_INPUT_FORMAT_CLASS,
- vertexInputFormatClass,
- VertexInputFormat.class);
+ VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
}
/**
@@ -127,9 +122,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setEdgeInputFormatClass(
Class<? extends EdgeInputFormat> edgeInputFormatClass) {
- setClass(EDGE_INPUT_FORMAT_CLASS,
- edgeInputFormatClass,
- EdgeInputFormat.class);
+ EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
}
/**
@@ -139,8 +132,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setMasterComputeClass(
Class<? extends MasterCompute> masterComputeClass) {
- setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
- MasterCompute.class);
+ MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
}
/**
@@ -150,8 +142,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void addMasterObserverClass(
Class<? extends MasterObserver> masterObserverClass) {
- addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass,
- MasterObserver.class);
+ MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
}
/**
@@ -161,8 +152,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void addWorkerObserverClass(
Class<? extends WorkerObserver> workerObserverClass) {
- addToClasses(WORKER_OBSERVER_CLASSES, workerObserverClass,
- WorkerObserver.class);
+ WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
}
/**
@@ -171,8 +161,7 @@ public class GiraphConfiguration extends Configuration
* @return GiraphJobObserver class set.
*/
public Class<? extends GiraphJobObserver> getJobObserverClass() {
- return getClass(JOB_OBSERVER_CLASS, DefaultJobObserver.class,
- GiraphJobObserver.class);
+ return JOB_OBSERVER_CLASS.get(this);
}
/**
@@ -181,7 +170,7 @@ public class GiraphConfiguration extends Configuration
* @param klass GiraphJobObserver class to set.
*/
public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
- setClass(JOB_OBSERVER_CLASS, klass, GiraphJobObserver.class);
+ JOB_OBSERVER_CLASS.set(this, klass);
}
/**
@@ -190,30 +179,7 @@ public class GiraphConfiguration extends Configuration
* @return true if jmap dumper is enabled.
*/
public boolean isJMapHistogramDumpEnabled() {
- return getBoolean(JMAP_ENABLE, JMAP_ENABLE_DEFAULT);
- }
-
- /**
- * Add a class to a property that is a list of classes. If the property does
- * not exist it will be created.
- *
- * @param name String name of property.
- * @param klass interface of the class being set.
- * @param xface Class to add to the list.
- */
- public final void addToClasses(String name, Class<?> klass, Class<?> xface) {
- if (!xface.isAssignableFrom(klass)) {
- throw new RuntimeException(klass + " does not implement " +
- xface.getName());
- }
- String value;
- String klasses = get(name);
- if (klasses == null) {
- value = klass.getName();
- } else {
- value = klasses + "," + klass.getName();
- }
- set(name, value);
+ return JMAP_ENABLE.get(this);
}
/**
@@ -238,36 +204,13 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Get classes from a property that all implement a given interface.
- *
- * @param name String name of property to fetch.
- * @param xface interface classes must implement.
- * @param defaultValue If not found, return this
- * @param <T> Generic type of interface class
- * @return array of Classes implementing interface specified.
- */
- public final <T> Class<? extends T>[] getClassesOfType(String name,
- Class<T> xface, Class<? extends T> ... defaultValue) {
- Class<?>[] klasses = getClasses(name, defaultValue);
- for (Class<?> klass : klasses) {
- if (!xface.isAssignableFrom(klass)) {
- throw new RuntimeException(klass + " is not assignable from " +
- xface.getName());
- }
- }
- return (Class<? extends T>[]) klasses;
- }
-
- /**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass Determines how graph is output
*/
public final void setVertexOutputFormatClass(
Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
- setClass(VERTEX_OUTPUT_FORMAT_CLASS,
- vertexOutputFormatClass,
- VertexOutputFormat.class);
+ VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
}
/**
@@ -277,7 +220,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexCombinerClass(
Class<? extends Combiner> vertexCombinerClass) {
- setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, Combiner.class);
+ VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
}
/**
@@ -286,10 +229,8 @@ public class GiraphConfiguration extends Configuration
* @param graphPartitionerFactoryClass Determines how the graph is partitioned
*/
public final void setGraphPartitionerFactoryClass(
- Class<?> graphPartitionerFactoryClass) {
- setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
- graphPartitionerFactoryClass,
- GraphPartitionerFactory.class);
+ Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
+ GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
}
/**
@@ -299,7 +240,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexResolverClass(
Class<? extends VertexResolver> vertexResolverClass) {
- setClass(VERTEX_RESOLVER_CLASS, vertexResolverClass, VertexResolver.class);
+ VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
}
/**
@@ -309,7 +250,7 @@ public class GiraphConfiguration extends Configuration
* @return true if we should create non existent vertices that get messages.
*/
public final boolean getResolverCreateVertexOnMessages() {
- return getBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, true);
+ return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
}
/**
@@ -318,7 +259,7 @@ public class GiraphConfiguration extends Configuration
* @param v true if we should create vertices when they get messages.
*/
public final void setResolverCreateVertexOnMessages(boolean v) {
- setBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, v);
+ RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
}
/**
@@ -329,8 +270,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setPartitionContextClass(
Class<? extends PartitionContext> partitionContextClass) {
- setClass(PARTITION_CONTEXT_CLASS, partitionContextClass,
- PartitionContext.class);
+ PARTITION_CONTEXT_CLASS.set(this, partitionContextClass);
}
/**
@@ -341,7 +281,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setWorkerContextClass(
Class<? extends WorkerContext> workerContextClass) {
- setClass(WORKER_CONTEXT_CLASS, workerContextClass, WorkerContext.class);
+ WORKER_CONTEXT_CLASS.set(this, workerContextClass);
}
/**
@@ -352,9 +292,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setAggregatorWriterClass(
Class<? extends AggregatorWriter> aggregatorWriterClass) {
- setClass(AGGREGATOR_WRITER_CLASS,
- aggregatorWriterClass,
- AggregatorWriter.class);
+ AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
}
/**
@@ -364,9 +302,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setPartitionClass(
Class<? extends Partition> partitionClass) {
- setClass(PARTITION_CLASS,
- partitionClass,
- Partition.class);
+ PARTITION_CLASS.set(this, partitionClass);
}
/**
@@ -384,7 +320,7 @@ public class GiraphConfiguration extends Configuration
float minPercentResponded) {
setInt(MIN_WORKERS, minWorkers);
setInt(MAX_WORKERS, maxWorkers);
- setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+ MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
}
public final int getMinWorkers() {
@@ -396,7 +332,7 @@ public class GiraphConfiguration extends Configuration
}
public final float getMinPercentResponded() {
- return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT);
+ return MIN_PERCENT_RESPONDED.get(this);
}
/**
@@ -411,7 +347,7 @@ public class GiraphConfiguration extends Configuration
}
public final boolean getSplitMasterWorker() {
- return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT);
+ return SPLIT_MASTER_WORKER.get(this);
}
/**
@@ -420,7 +356,7 @@ public class GiraphConfiguration extends Configuration
* @return array of MasterObserver classes.
*/
public Class<? extends MasterObserver>[] getMasterObserverClasses() {
- return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class);
+ return MASTER_OBSERVER_CLASSES.getArray(this);
}
/**
@@ -429,7 +365,7 @@ public class GiraphConfiguration extends Configuration
* @return array of WorkerObserver classes.
*/
public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
- return getClassesOfType(WORKER_OBSERVER_CLASSES, WorkerObserver.class);
+ return WORKER_OBSERVER_CLASSES.getArray(this);
}
/**
@@ -438,7 +374,7 @@ public class GiraphConfiguration extends Configuration
* @return true if metrics are enabled, false otherwise (default)
*/
public boolean metricsEnabled() {
- return getBoolean(METRICS_ENABLE, false);
+ return METRICS_ENABLE.isTrue(this);
}
/**
@@ -460,7 +396,7 @@ public class GiraphConfiguration extends Configuration
}
public String getLocalLevel() {
- return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
+ return LOG_LEVEL.get(this);
}
/**
@@ -469,15 +405,15 @@ public class GiraphConfiguration extends Configuration
* @return True if use the log thread layout option, false otherwise
*/
public boolean useLogThreadLayout() {
- return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT);
+ return LOG_THREAD_LAYOUT.get(this);
}
public boolean getLocalTestMode() {
- return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
+ return LOCAL_TEST_MODE.get(this);
}
public int getZooKeeperServerCount() {
- return getInt(ZOOKEEPER_SERVER_COUNT, ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ return ZOOKEEPER_SERVER_COUNT.get(this);
}
/**
@@ -490,31 +426,27 @@ public class GiraphConfiguration extends Configuration
}
public int getZooKeeperSessionTimeout() {
- return getInt(ZOOKEEPER_SESSION_TIMEOUT, ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ return ZOOKEEPER_SESSION_TIMEOUT.get(this);
}
public int getZookeeperOpsMaxAttempts() {
- return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS,
- ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT);
+ return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
}
public int getZookeeperOpsRetryWaitMsecs() {
- return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS,
- ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT);
+ return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
}
public boolean getNettyServerUseExecutionHandler() {
- return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
- NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
}
public int getNettyServerThreads() {
- return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT);
+ return NETTY_SERVER_THREADS.get(this);
}
public int getNettyServerExecutionThreads() {
- return getInt(NETTY_SERVER_EXECUTION_THREADS,
- NETTY_SERVER_EXECUTION_THREADS_DEFAULT);
+ return NETTY_SERVER_EXECUTION_THREADS.get(this);
}
/**
@@ -532,26 +464,23 @@ public class GiraphConfiguration extends Configuration
}
public int getZookeeperConnectionAttempts() {
- return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS,
- ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT);
+ return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
}
public int getZooKeeperMinSessionTimeout() {
- return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT,
- DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
+ return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
}
public int getZooKeeperMaxSessionTimeout() {
- return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT,
- DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT);
+ return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
}
public String getZooKeeperForceSync() {
- return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC);
+ return ZOOKEEPER_FORCE_SYNC.get(this);
}
public String getZooKeeperSkipAcl() {
- return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL);
+ return ZOOKEEPER_SKIP_ACL.get(this);
}
/**
@@ -574,7 +503,7 @@ public class GiraphConfiguration extends Configuration
* @return True if should authenticate, false otherwise
*/
public boolean authenticate() {
- return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
+ return AUTHENTICATE.get(this);
}
/**
@@ -583,11 +512,11 @@ public class GiraphConfiguration extends Configuration
* @param numComputeThreads Number of compute threads to use
*/
public void setNumComputeThreads(int numComputeThreads) {
- setInt(NUM_COMPUTE_THREADS, numComputeThreads);
+ NUM_COMPUTE_THREADS.set(this, numComputeThreads);
}
public int getNumComputeThreads() {
- return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT);
+ return NUM_COMPUTE_THREADS.get(this);
}
/**
@@ -596,19 +525,19 @@ public class GiraphConfiguration extends Configuration
* @param numInputSplitsThreads Number of input split threads to use
*/
public void setNumInputSplitsThreads(int numInputSplitsThreads) {
- setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads);
+ NUM_INPUT_SPLITS_THREADS.set(this, numInputSplitsThreads);
}
public int getNumInputSplitsThreads() {
- return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT);
+ return NUM_INPUT_SPLITS_THREADS.get(this);
}
public long getInputSplitMaxVertices() {
- return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+ return INPUT_SPLIT_MAX_VERTICES.get(this);
}
public long getInputSplitMaxEdges() {
- return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
+ return INPUT_SPLIT_MAX_EDGES.get(this);
}
/**
@@ -617,7 +546,7 @@ public class GiraphConfiguration extends Configuration
* @param useUnsafeSerialization If true, use unsafe serialization
*/
public void useUnsafeSerialization(boolean useUnsafeSerialization) {
- setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
+ USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
}
/**
@@ -627,8 +556,7 @@ public class GiraphConfiguration extends Configuration
* @return Whether to use message size encoding
*/
public boolean useMessageSizeEncoding() {
- return getBoolean(
- USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT);
+ return USE_MESSAGE_SIZE_ENCODING.get(this);
}
/**
@@ -638,7 +566,7 @@ public class GiraphConfiguration extends Configuration
* @param checkpointFrequency How often to checkpoint (0 means never)
*/
public void setCheckpointFrequency(int checkpointFrequency) {
- setInt(CHECKPOINT_FREQUENCY, checkpointFrequency);
+ CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
}
/**
@@ -648,7 +576,7 @@ public class GiraphConfiguration extends Configuration
* @return Checkpoint frequency (0 means never)
*/
public int getCheckpointFrequency() {
- return getInt(CHECKPOINT_FREQUENCY, CHECKPOINT_FREQUENCY_DEFAULT);
+ return CHECKPOINT_FREQUENCY.get(this);
}
/**
@@ -666,7 +594,7 @@ public class GiraphConfiguration extends Configuration
* @param maxTaskAttempts Max task attempts to use
*/
public void setMaxTaskAttempts(int maxTaskAttempts) {
- setInt(MAX_TASK_ATTEMPTS, maxTaskAttempts);
+ MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
}
/**
@@ -675,7 +603,7 @@ public class GiraphConfiguration extends Configuration
* @return Max task attempts or -1, if not set
*/
public int getMaxTaskAttempts() {
- return getInt(MAX_TASK_ATTEMPTS, -1);
+ return MAX_TASK_ATTEMPTS.get(this);
}
/**
@@ -684,7 +612,7 @@ public class GiraphConfiguration extends Configuration
* @return Number of milliseconds to wait for an event before continuing on
*/
public int getEventWaitMsecs() {
- return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT);
+ return EVENT_WAIT_MSECS.get(this);
}
/**
@@ -694,7 +622,7 @@ public class GiraphConfiguration extends Configuration
* continuing on
*/
public void setEventWaitMsecs(int eventWaitMsecs) {
- setInt(EVENT_WAIT_MSECS, eventWaitMsecs);
+ EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
}
/**
@@ -705,8 +633,7 @@ public class GiraphConfiguration extends Configuration
* minimum number of workers before a superstep
*/
public int getMaxMasterSuperstepWaitMsecs() {
- return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS,
- MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT);
+ return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
}
/**
@@ -718,7 +645,7 @@ public class GiraphConfiguration extends Configuration
* number of workers before a superstep
*/
public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
- setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs);
+ MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
}
/**
@@ -738,8 +665,7 @@ public class GiraphConfiguration extends Configuration
* @return True iff we want to use input split locality
*/
public boolean useInputSplitLocality() {
- return getBoolean(GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+ return USE_INPUT_SPLIT_LOCALITY.get(this);
}
/**
@@ -748,8 +674,7 @@ public class GiraphConfiguration extends Configuration
* @return True iff we can reuse incoming edge objects.
*/
public boolean reuseIncomingEdgeObjects() {
- return getBoolean(GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS,
- GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS_DEFAULT);
+ return GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS.get(this);
}
/**
@@ -760,8 +685,8 @@ public class GiraphConfiguration extends Configuration
*/
public String getLocalHostname() throws UnknownHostException {
return DNS.getDefaultHost(
- get(GiraphConstants.DNS_INTERFACE, "default"),
- get(GiraphConstants.DNS_NAMESERVER, "default"));
+ GiraphConstants.DNS_INTERFACE.get(this),
+ GiraphConstants.DNS_NAMESERVER.get(this));
}
/**
@@ -771,7 +696,7 @@ public class GiraphConfiguration extends Configuration
* @param maxNumberOfSupersteps Maximum number of supersteps
*/
public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
- setInt(MAX_NUMBER_OF_SUPERSTEPS, maxNumberOfSupersteps);
+ MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
}
/**
@@ -781,6 +706,6 @@ public class GiraphConfiguration extends Configuration
* @return Maximum number of supersteps
*/
public int getMaxNumberOfSupersteps() {
- return getInt(MAX_NUMBER_OF_SUPERSTEPS, MAX_NUMBER_OF_SUPERSTEPS_DEFAULT);
+ return MAX_NUMBER_OF_SUPERSTEPS.get(this);
}
}