You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/28 01:02:28 UTC
[1/3] GIRAPH-587: Refactor configuration options (nitay)
Updated Branches:
refs/heads/trunk 460198af9 -> 01c527e22
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index cc7cb17..58aa7d1 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Lists;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -27,16 +26,18 @@ import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import static org.junit.Assert.assertEquals;
@@ -135,11 +136,11 @@ public class RequestFailureTest {
@Test
public void alreadyProcessedRequest() throws IOException {
// Force a drop of the first request
- conf.setBoolean(GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED, true);
+ GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED.set(conf, true);
// One second to finish a request
- conf.setInt(GiraphConstants.MAX_REQUEST_MILLISECONDS, 1000);
+ GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
// Loop every 2 seconds
- conf.setInt(GiraphConstants.WAITING_REQUEST_MSECS, 2000);
+ GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
checkSendingTwoRequests();
}
@@ -147,11 +148,11 @@ public class RequestFailureTest {
@Test
public void resendRequest() throws IOException {
// Force a drop of the first request
- conf.setBoolean(GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED, true);
+ GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, true);
// One second to finish a request
- conf.setInt(GiraphConstants.MAX_REQUEST_MILLISECONDS, 1000);
+ GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
// Loop every 2 seconds
- conf.setInt(GiraphConstants.WAITING_REQUEST_MSECS, 2000);
+ GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
checkSendingTwoRequests();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index c8d6b3b..f1f8e26 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -18,8 +18,6 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -29,22 +27,25 @@ import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
@@ -85,8 +86,7 @@ public class RequestTest {
public void setUp() throws IOException {
// Setup the conf
GiraphConfiguration tmpConf = new GiraphConfiguration();
- tmpConf.setClass(GiraphConstants.VERTEX_CLASS, TestVertex.class,
- Vertex.class);
+ GiraphConstants.VERTEX_CLASS.set(tmpConf, TestVertex.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConf);
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index 3094f3e..c27156f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Lists;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.SaslServerHandler;
@@ -26,8 +25,8 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -35,6 +34,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import static org.mockito.Mockito.mock;
@@ -58,7 +59,7 @@ public class SaslConnectionTest {
public void setUp() {
GiraphConfiguration tmpConfig = new GiraphConfiguration();
tmpConfig.setVertexClass(IntVertex.class);
- tmpConfig.setBoolean(GiraphConstants.AUTHENTICATE, true);
+ GiraphConstants.AUTHENTICATE.set(tmpConfig, true);
conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
index cd37197..f0d3bb1 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.giraph.conf;
import org.junit.Test;
+import static org.apache.giraph.conf.ClassConfOption.getClassesOfType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -45,7 +46,7 @@ public class TestGiraphConfiguration {
assertEquals(2, conf.getClasses("foo").length);
}
- Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
+ Class<? extends If>[] klasses2 = getClassesOfType(conf, "foo", If.class);
assertEquals(2, klasses2.length);
assertEquals(A.class, klasses2[0]);
assertEquals(B.class, klasses2[1]);
@@ -56,14 +57,14 @@ public class TestGiraphConfiguration {
GiraphConfiguration conf = new GiraphConfiguration();
conf.setClasses("foo", If.class, A.class, B.class);
- conf.addToClasses("foo", C.class, If.class);
+ ClassConfOption.addToClasses(conf, "foo", C.class, If.class);
Class<?>[] klasses = conf.getClasses("foo");
assertEquals(3, klasses.length);
assertEquals(A.class, klasses[0]);
assertEquals(B.class, klasses[1]);
assertEquals(C.class, klasses[2]);
- conf.addToClasses("bar", B.class, If.class);
+ ClassConfOption.addToClasses(conf, "bar", B.class, If.class);
klasses = conf.getClasses("bar");
assertEquals(1, klasses.length);
assertEquals(B.class, klasses[0]);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
index 9f0920a..9075145 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
@@ -18,15 +18,13 @@
package org.apache.giraph.conf;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -58,14 +56,10 @@ public class TestObjectCreation {
@Before
public void setUp() {
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setClass(GiraphConstants.VERTEX_ID_CLASS, IntWritable.class,
- WritableComparable.class);
- conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS, LongWritable.class,
- Writable.class);
- conf.setClass(GiraphConstants.EDGE_VALUE_CLASS, DoubleWritable.class,
- Writable.class);
- conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS, LongWritable.class,
- Writable.class);
+ GiraphConstants.VERTEX_ID_CLASS.set(conf, IntWritable.class);
+ GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(conf, DoubleWritable.class);
+ GiraphConstants.MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
conf.setVertexClass(ImmutableVertex.class);
configuration =
new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
index 54a8b92..10bb4d0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
@@ -18,18 +18,19 @@
package org.apache.giraph.master;
-import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.io.formats.IntNullNullNullTextInputFormat;
-import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.IntNullNullNullTextInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.Map;
@@ -87,7 +88,7 @@ public class TestMasterObserver {
Obs.class.getName(),
Obs.class.getName()
};
- params.put(GiraphConstants.MASTER_OBSERVER_CLASSES,
+ params.put(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
StringUtils.arrayToString(klasses));
GiraphClasses classes = new GiraphClasses();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index d403dd8..8e79a47 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -18,19 +18,20 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Iterables;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Iterables;
+
import java.io.IOException;
import static org.junit.Assert.assertEquals;
@@ -134,8 +135,8 @@ public class TestPartitionStores {
@Test
public void testDiskBackedPartitionStore() {
- conf.setBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH, true);
- conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 1);
+ GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+ GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
partitionStore = new DiskBackedPartitionStore<IntWritable,
@@ -143,7 +144,7 @@ public class TestPartitionStores {
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
- conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 2);
+ GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2);
partitionStore = new DiskBackedPartitionStore<IntWritable,
IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index efbe320..386f67b 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -77,11 +77,11 @@ public class TestAutoCheckpoint extends BspCase {
conf.setMaxMasterSuperstepWaitMsecs(10000);
conf.setEventWaitMsecs(1000);
conf.setCheckpointFrequency(2);
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY,
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf,
getTempPath("_singleFaultCheckpoints").toString());
- conf.setBoolean(GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
- conf.setInt(GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT, 10000);
- conf.setInt(GiraphConstants.ZOOKEEPER_MIN_SESSION_TIMEOUT, 10000);
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
+ GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.set(conf, 10000);
+ GiraphConstants.ZOOKEEPER_MIN_SESSION_TIMEOUT.set(conf, 10000);
assertTrue(job.run(true));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 5234730..666a50b 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -200,7 +200,7 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
GiraphJob job = prepareJob(getCallingMethodName(), classes);
GiraphConfiguration conf = job.getConfiguration();
conf.setWorkerConfiguration(5, 5, 100.0f);
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, true);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, true);
try {
job.run(true);
@@ -208,7 +208,7 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
} catch (IllegalArgumentException e) {
}
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
try {
job.run(true);
fail();
@@ -264,7 +264,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob job = prepareJob(callingMethod, classes, outputPath);
Configuration conf = job.getConfiguration();
- conf.setFloat(GiraphConstants.TOTAL_INPUT_SPLIT_MULTIPLIER, 2.0f);
// GeneratedInputSplit will generate 10 vertices
conf.setLong(GeneratedVertexReader.READER_VERTICES, 10);
assertTrue(job.run(true));
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index eb2338c..210b78a 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -19,6 +19,7 @@
package org.apache.giraph;
import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
@@ -64,11 +65,10 @@ public class TestManualCheckpoint extends BspCase {
classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
- checkpointsDir.toString());
- job.getConfiguration().setBoolean(
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
- job.getConfiguration().setCheckpointFrequency(2);
+ GiraphConfiguration conf = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
+ conf.setCheckpointFrequency(2);
assertTrue(job.run(true));
@@ -97,9 +97,9 @@ public class TestManualCheckpoint extends BspCase {
classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
classes, outputPath);
- job.getConfiguration().setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- restartedJob.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
checkpointsDir.toString());
assertTrue(restartedJob.run(true));
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
index 41f5e3c..f236128 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -64,8 +64,7 @@ public class TestPartitionContext extends BspCase {
GeneratedVertexReader.READER_VERTICES,
PartitionContextTestVertex.NUM_VERTICES);
// Increase the number of partitions
- job.getConfiguration().setInt(
- GiraphConstants.USER_PARTITION_COUNT,
+ GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),
PartitionContextTestVertex.NUM_PARTITIONS);
assertTrue(job.run(true));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 7deeb42..372c32f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -19,10 +19,11 @@
package org.apache.giraph.aggregators;
import org.apache.giraph.BspCase;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.examples.AggregatorsTestVertex;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
@@ -37,10 +38,6 @@ import org.apache.hadoop.util.Progressable;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -49,6 +46,10 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/** Tests if aggregators are handled on a proper way */
public class TestAggregatorsHandling extends BspCase {
@@ -165,11 +166,10 @@ public class TestAggregatorsHandling extends BspCase {
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
- checkpointsDir.toString());
- job.getConfiguration().setBoolean(
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
- job.getConfiguration().setCheckpointFrequency(4);
+ GiraphConfiguration conf = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
+ conf.setCheckpointFrequency(4);
assertTrue(job.run(true));
@@ -187,10 +187,10 @@ public class TestAggregatorsHandling extends BspCase {
classes, outputPath);
job.getConfiguration().setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- restartedJob.getConfiguration().set(
- GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
- restartedJob.getConfiguration().setLong(
- GiraphConstants.RESTART_SUPERSTEP, 4);
+ GiraphConfiguration restartedJobConf = restartedJob.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJobConf,
+ checkpointsDir.toString());
+ restartedJobConf.setLong(GiraphConstants.RESTART_SUPERSTEP, 4);
assertTrue(restartedJob.run(true));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
index f56d7e5..448afe6 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -81,8 +81,7 @@ public class TestPageRank extends BspCase {
conf.setNumComputeThreads(numComputeThreads);
// Set enough partitions to generate randomness on the compute side
if (numComputeThreads != 1) {
- conf.setInt(GiraphConstants.USER_PARTITION_COUNT,
- numComputeThreads * 5);
+ GiraphConstants.USER_PARTITION_COUNT.set(conf, numComputeThreads * 5);
}
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
index c5d3a55..c8ca8a1 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -71,7 +71,8 @@ public class TryMultiIpcBindingPortsTest {
// run internally
// fail the first port binding attempt
Map<String, String> params = Maps.<String, String>newHashMap();
- params.put(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT, "true");
+ params.put(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.getKey(),
+ "true");
GiraphClasses classes = new GiraphClasses();
classes.setVertexClass(ConnectedComponentsVertex.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
index 5ca55b6..0898d63 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
@@ -22,12 +22,9 @@ import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexValueFactory;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
@@ -41,6 +38,8 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
+
public class TestVertexTypes {
@@ -131,7 +130,7 @@ public class TestVertexTypes {
Configuration conf = new Configuration();
conf.setInt(GiraphConstants.MAX_WORKERS, 1);
conf.setInt(GiraphConstants.MIN_WORKERS, 1);
- conf.set(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+ conf.set(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey(),
"org.apache.giraph.io.formats.DUMMY_TEST_VALUE");
return conf;
}
@@ -140,18 +139,12 @@ public class TestVertexTypes {
public void testMatchingType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf();
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- GeneratedVertexMatch.class,
- Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
- conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
- GeneratedVertexMatchCombiner.class,
- Combiner.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
+ GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
+ GeneratedVertexMatchCombiner.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
@@ -167,15 +160,10 @@ public class TestVertexTypes {
public void testDerivedMatchingType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- DerivedVertexMatch.class,
- Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, DerivedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
@@ -186,15 +174,10 @@ public class TestVertexTypes {
public void testDerivedInputFormatType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- DerivedVertexMatch.class,
- Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, DerivedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
@@ -205,36 +188,26 @@ public class TestVertexTypes {
public void testMismatchingVertex() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- GeneratedVertexMismatch.class,
- Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
- @SuppressWarnings("rawtypes")
- GiraphConfigurationValidator<?, ?, ?, ?> validator =
- new GiraphConfigurationValidator(conf);
- validator.validateConfiguration();
+ GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMismatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
+ @SuppressWarnings("rawtypes")
+ GiraphConfigurationValidator<?, ?, ?, ?> validator =
+ new GiraphConfigurationValidator(conf);
+ validator.validateConfiguration();
}
@Test(expected = IllegalArgumentException.class)
public void testMismatchingCombiner() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- GeneratedVertexMatch.class, Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
- conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
- GeneratedVertexMismatchCombiner.class,
- Combiner.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
+ GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
+ GeneratedVertexMismatchCombiner.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
@@ -245,17 +218,12 @@ public class TestVertexTypes {
public void testMismatchingVertexValueFactory() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- GeneratedVertexMatch.class, Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- SimpleSuperstepVertexInputFormat.class,
- VertexInputFormat.class);
- conf.setClass(GiraphConstants.VERTEX_VALUE_FACTORY_CLASS,
- GeneratedVertexMismatchValueFactory.class,
- VertexValueFactory.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ SimpleSuperstepVertexInputFormat.class);
+ VERTEX_VALUE_FACTORY_CLASS.set(conf,
+ GeneratedVertexMismatchValueFactory.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
@@ -266,18 +234,12 @@ public class TestVertexTypes {
public void testJsonBase64FormatType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf() ;
- conf.setClass(GiraphConstants.VERTEX_CLASS,
- GeneratedVertexMatch.class,
- Vertex.class);
- conf.setClass(GiraphConstants.VERTEX_EDGES_CLASS,
- ByteArrayEdges.class,
- VertexEdges.class);
- conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- JsonBase64VertexInputFormat.class,
- VertexInputFormat.class);
- conf.setClass(GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS,
- JsonBase64VertexOutputFormat.class,
- VertexOutputFormat.class);
+ GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMatch.class);
+ GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ JsonBase64VertexInputFormat.class);
+ GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf,
+ JsonBase64VertexOutputFormat.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
[2/3] GIRAPH-587: Refactor configuration options (nitay)
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7882d06..c5b9b93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,72 +17,147 @@
*/
package org.apache.giraph.conf;
+import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.partition.HashPartitionerFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
+import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
/**
* Constants used all over Giraph for configuration.
*/
// CHECKSTYLE: stop InterfaceIsTypeCheck
public interface GiraphConstants {
+ /** 1KB in bytes */
+ int ONE_KB = 1024;
+
/** Vertex class - required */
- String VERTEX_CLASS = "giraph.vertexClass";
+ ClassConfOption<Vertex> VERTEX_CLASS =
+ ClassConfOption.create("giraph.vertexClass", null, Vertex.class);
/** Vertex value factory class - optional */
- String VERTEX_VALUE_FACTORY_CLASS = "giraph.vertexValueFactoryClass";
+ ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
+ ClassConfOption.create("giraph.vertexValueFactoryClass",
+ DefaultVertexValueFactory.class, VertexValueFactory.class);
/** Vertex edges class - optional */
- String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
+ ClassConfOption<VertexEdges> VERTEX_EDGES_CLASS =
+ ClassConfOption.create("giraph.vertexEdgesClass", ByteArrayEdges.class,
+ VertexEdges.class);
/** Vertex edges class to be used during edge input only - optional */
- String INPUT_VERTEX_EDGES_CLASS = "giraph.inputVertexEdgesClass";
+ ClassConfOption<VertexEdges> INPUT_VERTEX_EDGES_CLASS =
+ ClassConfOption.create("giraph.inputVertexEdgesClass",
+ ByteArrayEdges.class, VertexEdges.class);
/** Class for Master - optional */
- String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+ ClassConfOption<MasterCompute> MASTER_COMPUTE_CLASS =
+ ClassConfOption.create("giraph.masterComputeClass",
+ DefaultMasterCompute.class, MasterCompute.class);
/** Classes for Master Observer - optional */
- String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+ ClassConfOption<MasterObserver> MASTER_OBSERVER_CLASSES =
+ ClassConfOption.create("giraph.master.observers",
+ null, MasterObserver.class);
/** Classes for Worker Observer - optional */
- String WORKER_OBSERVER_CLASSES = "giraph.worker.observers";
+ ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
+ ClassConfOption.create("giraph.worker.observers", null,
+ WorkerObserver.class);
/** Vertex combiner class - optional */
- String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+ ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
+ ClassConfOption.create("giraph.combinerClass", null, Combiner.class);
/** Vertex resolver class - optional */
- String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+ ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
+ ClassConfOption.create("giraph.vertexResolverClass",
+ DefaultVertexResolver.class, VertexResolver.class);
+
/**
* Option of whether to create vertexes that were not existent before but
* received messages
*/
- String RESOLVER_CREATE_VERTEX_ON_MSGS =
- "giraph.vertex.resolver.create.on.msgs";
+ BooleanConfOption RESOLVER_CREATE_VERTEX_ON_MSGS =
+ new BooleanConfOption("giraph.vertex.resolver.create.on.msgs", true);
/** Graph partitioner factory class - optional */
- String GRAPH_PARTITIONER_FACTORY_CLASS =
- "giraph.graphPartitionerFactoryClass";
+ ClassConfOption<GraphPartitionerFactory> GRAPH_PARTITIONER_FACTORY_CLASS =
+ ClassConfOption.create("giraph.graphPartitionerFactoryClass",
+ HashPartitionerFactory.class, GraphPartitionerFactory.class);
/** Observer class to watch over job status - optional */
- String JOB_OBSERVER_CLASS = "giraph.jobObserverClass";
+ ClassConfOption<GiraphJobObserver> JOB_OBSERVER_CLASS =
+ ClassConfOption.create("giraph.jobObserverClass",
+ DefaultJobObserver.class, GiraphJobObserver.class);
// At least one of the input format classes is required.
/** VertexInputFormat class */
- String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+ ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.vertexInputFormatClass", null,
+ VertexInputFormat.class);
/** EdgeInputFormat class */
- String EDGE_INPUT_FORMAT_CLASS = "giraph.edgeInputFormatClass";
+ ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.edgeInputFormatClass", null,
+ EdgeInputFormat.class);
/** VertexOutputFormat class */
- String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+ ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.vertexOutputFormatClass", null,
+ VertexOutputFormat.class);
/** Output Format Path (for Giraph-on-YARN) */
String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
/** Vertex index class */
- String VERTEX_ID_CLASS = "giraph.vertexIdClass";
+ ClassConfOption<WritableComparable> VERTEX_ID_CLASS =
+ ClassConfOption.create("giraph.vertexIdClass", null,
+ WritableComparable.class);
/** Vertex value class */
- String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+ ClassConfOption<Writable> VERTEX_VALUE_CLASS =
+ ClassConfOption.create("giraph.vertexValueClass", null, Writable.class);
/** Edge value class */
- String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+ ClassConfOption<Writable> EDGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.edgeValueClass", null, Writable.class);
/** Message value class */
- String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+ ClassConfOption<Writable> MESSAGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.messageValueClass", null, Writable.class);
/** Partition context class */
- String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass";
+ ClassConfOption<PartitionContext> PARTITION_CONTEXT_CLASS =
+ ClassConfOption.create("giraph.partitionContextClass",
+ DefaultPartitionContext.class, PartitionContext.class);
/** Worker context class */
- String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+ ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
+ ClassConfOption.create("giraph.workerContextClass",
+ DefaultWorkerContext.class, WorkerContext.class);
/** AggregatorWriter class - optional */
- String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass";
+ ClassConfOption<AggregatorWriter> AGGREGATOR_WRITER_CLASS =
+ ClassConfOption.create("giraph.aggregatorWriterClass",
+ TextAggregatorWriter.class, AggregatorWriter.class);
/** Partition class - optional */
- String PARTITION_CLASS = "giraph.partitionClass";
+ ClassConfOption<Partition> PARTITION_CLASS =
+ ClassConfOption.create("giraph.partitionClass", SimplePartition.class,
+ Partition.class);
/**
* Minimum number of simultaneous workers before this job can run (int)
@@ -97,54 +172,42 @@ public interface GiraphConstants {
* Separate the workers and the master tasks. This is required
* to support dynamic recovery. (boolean)
*/
- String SPLIT_MASTER_WORKER = "giraph.SplitMasterWorker";
- /**
- * Default on whether to separate the workers and the master tasks.
- * Needs to be "true" to support dynamic recovery.
- */
- boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+ BooleanConfOption SPLIT_MASTER_WORKER =
+ new BooleanConfOption("giraph.SplitMasterWorker", true);
/** Indicates whether this job is run in an internal unit test */
- String LOCAL_TEST_MODE = "giraph.localTestMode";
-
- /** not in local test mode per default */
- boolean LOCAL_TEST_MODE_DEFAULT = false;
+ BooleanConfOption LOCAL_TEST_MODE =
+ new BooleanConfOption("giraph.localTestMode", false);
/** Override the Hadoop log level and set the desired log level. */
- String LOG_LEVEL = "giraph.logLevel";
- /** Default log level is INFO (same as Hadoop) */
- String LOG_LEVEL_DEFAULT = "info";
+ StrConfOption LOG_LEVEL = new StrConfOption("giraph.logLevel", "info");
/** Use thread level debugging? */
- String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
- /** Default to not use thread-level debugging */
- boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+ BooleanConfOption LOG_THREAD_LAYOUT =
+ new BooleanConfOption("giraph.logThreadLayout", false);
/** Configuration key to enable jmap printing */
- String JMAP_ENABLE = "giraph.jmap.histo.enable";
- /** Default value for enabling jmap */
- boolean JMAP_ENABLE_DEFAULT = false;
+ BooleanConfOption JMAP_ENABLE =
+ new BooleanConfOption("giraph.jmap.histo.enable", false);
/** Configuration key for msec to sleep between calls */
- String JMAP_SLEEP_MILLIS = "giraph.jmap.histo.msec";
- /** Default msec to sleep between calls */
- int JMAP_SLEEP_MILLIS_DEFAULT = 30000;
+ IntConfOption JMAP_SLEEP_MILLIS =
+ new IntConfOption("giraph.jmap.histo.msec", SECONDS.toMillis(30));
/** Configuration key for how many lines to print */
- String JMAP_PRINT_LINES = "giraph.jmap.histo.print_lines";
- /** Default lines of output to print */
- int JMAP_PRINT_LINES_DEFAULT = 30;
+ IntConfOption JMAP_PRINT_LINES =
+ new IntConfOption("giraph.jmap.histo.print_lines", 30);
/**
* Minimum percent of the maximum number of workers that have responded
* in order to continue progressing. (float)
*/
- String MIN_PERCENT_RESPONDED = "giraph.minPercentResponded";
- /** Default 100% response rate for workers */
- float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
+ FloatConfOption MIN_PERCENT_RESPONDED =
+ new FloatConfOption("giraph.minPercentResponded", 100.0f);
/** Enable the Metrics system **/
- String METRICS_ENABLE = "giraph.metrics.enable";
+ BooleanConfOption METRICS_ENABLE =
+ new BooleanConfOption("giraph.metrics.enable", false);
/**
* ZooKeeper comma-separated list (if not set,
@@ -153,24 +216,20 @@ public interface GiraphConstants {
String ZOOKEEPER_LIST = "giraph.zkList";
/** ZooKeeper session millisecond timeout */
- String ZOOKEEPER_SESSION_TIMEOUT = "giraph.zkSessionMsecTimeout";
- /** Default Zookeeper session millisecond timeout */
- int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
+ IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1));
/** Polling interval to check for the ZooKeeper server data */
- String ZOOKEEPER_SERVERLIST_POLL_MSECS = "giraph.zkServerlistPollMsecs";
- /** Default polling interval to check for the ZooKeeper server data */
- int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = 3 * 1000;
+ IntConfOption ZOOKEEPER_SERVERLIST_POLL_MSECS =
+ new IntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3));
/** Number of nodes (not tasks) to run Zookeeper on */
- String ZOOKEEPER_SERVER_COUNT = "giraph.zkServerCount";
- /** Default number of nodes to run Zookeeper on */
- int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
+ IntConfOption ZOOKEEPER_SERVER_COUNT =
+ new IntConfOption("giraph.zkServerCount", 1);
/** ZooKeeper port to use */
- String ZOOKEEPER_SERVER_PORT = "giraph.zkServerPort";
- /** Default ZooKeeper port to use */
- int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
+ IntConfOption ZOOKEEPER_SERVER_PORT =
+ new IntConfOption("giraph.zkServerPort", 22181);
/** Location of the ZooKeeper jar - Used internally, not meant for users */
String ZOOKEEPER_JAR = "giraph.zkJar";
@@ -179,187 +238,130 @@ public interface GiraphConstants {
String ZOOKEEPER_DIR = "giraph.zkDir";
/** Max attempts for handling ZooKeeper connection loss */
- String ZOOKEEPER_OPS_MAX_ATTEMPTS = "giraph.zkOpsMaxAttempts";
- /** Default of 3 attempts for handling ZooKeeper connection loss */
- int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+ IntConfOption ZOOKEEPER_OPS_MAX_ATTEMPTS =
+ new IntConfOption("giraph.zkOpsMaxAttempts", 3);
/**
- * Msecs to wait before retrying a failed ZooKeeper op due to connection
- * loss.
+ * Msecs to wait before retrying a failed ZooKeeper op due to connection loss.
*/
- String ZOOKEEPER_OPS_RETRY_WAIT_MSECS = "giraph.zkOpsRetryWaitMsecs";
- /**
- * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
- * connection loss.
- */
- int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+ IntConfOption ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
+ new IntConfOption("giraph.zkOpsRetryWaitMsecs", SECONDS.toMillis(5));
/** TCP backlog (defaults to number of workers) */
- String TCP_BACKLOG = "giraph.tcpBacklog";
- /**
- * Default TCP backlog default if the number of workers is not specified
- * (i.e unittests)
- */
- int TCP_BACKLOG_DEFAULT = 1;
+ IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1);
- /** How big to make the default buffer? */
- String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
- "giraph.nettyRequestEncoderBufferSize";
- /** Start with 32K */
- int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
+ /** How big to make the encoder buffer? */
+ IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+ new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB);
/** Whether or not netty request encoder should use direct byte buffers */
- String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
- "giraph.nettyRequestEncoderUseDirectBuffers";
- /**
- * By default don't use direct buffers,
- * since jobs can take more than allowed heap memory in that case
- */
- boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false;
+ BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
+ new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers",
+ false);
/** Netty client threads */
- String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
- /** Default is 4 */
- int NETTY_CLIENT_THREADS_DEFAULT = 4;
+ IntConfOption NETTY_CLIENT_THREADS =
+ new IntConfOption("giraph.nettyClientThreads", 4);
/** Netty server threads */
- String NETTY_SERVER_THREADS = "giraph.nettyServerThreads";
- /** Default is 16 */
- int NETTY_SERVER_THREADS_DEFAULT = 16;
+ IntConfOption NETTY_SERVER_THREADS =
+ new IntConfOption("giraph.nettyServerThreads", 16);
/** Use the execution handler in netty on the client? */
- String NETTY_CLIENT_USE_EXECUTION_HANDLER =
- "giraph.nettyClientUseExecutionHandler";
- /** Use the execution handler in netty on the client - default true */
- boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT = true;
+ BooleanConfOption NETTY_CLIENT_USE_EXECUTION_HANDLER =
+ new BooleanConfOption("giraph.nettyClientUseExecutionHandler", true);
/** Netty client execution threads (execution handler) */
- String NETTY_CLIENT_EXECUTION_THREADS =
- "giraph.nettyClientExecutionThreads";
- /** Default Netty client execution threads (execution handler) of 8 */
- int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+ IntConfOption NETTY_CLIENT_EXECUTION_THREADS =
+ new IntConfOption("giraph.nettyClientExecutionThreads", 8);
/** Where to place the netty client execution handle? */
- String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
- "giraph.nettyClientExecutionAfterHandler";
- /**
- * Default is to use the netty client execution handle after the request
- * encoder.
- */
- String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT = "requestEncoder";
+ StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+ new StrConfOption("giraph.nettyClientExecutionAfterHandler",
+ "requestEncoder");
/** Use the execution handler in netty on the server? */
- String NETTY_SERVER_USE_EXECUTION_HANDLER =
- "giraph.nettyServerUseExecutionHandler";
- /** Use the execution handler in netty on the server - default true */
- boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT = true;
+ BooleanConfOption NETTY_SERVER_USE_EXECUTION_HANDLER =
+ new BooleanConfOption("giraph.nettyServerUseExecutionHandler", true);
/** Netty server execution threads (execution handler) */
- String NETTY_SERVER_EXECUTION_THREADS = "giraph.nettyServerExecutionThreads";
- /** Default Netty server execution threads (execution handler) of 8 */
- int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+ IntConfOption NETTY_SERVER_EXECUTION_THREADS =
+ new IntConfOption("giraph.nettyServerExecutionThreads", 8);
/** Where to place the netty server execution handle? */
- String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
- "giraph.nettyServerExecutionAfterHandler";
- /**
- * Default is to use the netty server execution handle after the request
- * frame decoder.
- */
- String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT = "requestFrameDecoder";
+ StrConfOption NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+ new StrConfOption("giraph.nettyServerExecutionAfterHandler",
+ "requestFrameDecoder");
/** Netty simulate a first request closed */
- String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
- "giraph.nettySimulateFirstRequestClosed";
- /** Default of not simulating failure for first request */
- boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT = false;
+ BooleanConfOption NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+ new BooleanConfOption("giraph.nettySimulateFirstRequestClosed", false);
/** Netty simulate a first response failed */
- String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
- "giraph.nettySimulateFirstResponseFailed";
- /** Default of not simulating failure for first reponse */
- boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT = false;
+ BooleanConfOption NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+ new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false);
/** Max resolve address attempts */
- String MAX_RESOLVE_ADDRESS_ATTEMPTS = "giraph.maxResolveAddressAttempts";
- /** Default max resolve address attempts */
- int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+ IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
+ new IntConfOption("giraph.maxResolveAddressAttempts", 5);
/** Msecs to wait between waiting for all requests to finish */
- String WAITING_REQUEST_MSECS = "giraph.waitingRequestMsecs";
- /** Default msecs to wait between waiting for all requests to finish */
- int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+ IntConfOption WAITING_REQUEST_MSECS =
+ new IntConfOption("giraph.waitingRequestMsecs", SECONDS.toMillis(15));
/** Millseconds to wait for an event before continuing */
- String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
- /**
- * Default milliseconds to wait for an event before continuing (30 seconds)
- */
- int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+ IntConfOption EVENT_WAIT_MSECS =
+ new IntConfOption("giraph.eventWaitMsecs", SECONDS.toMillis(30));
/**
* Maximum milliseconds to wait before giving up trying to get the minimum
* number of workers before a superstep (int).
*/
- String MAX_MASTER_SUPERSTEP_WAIT_MSECS = "giraph.maxMasterSuperstepWaitMsecs";
- /**
- * Default maximum milliseconds to wait before giving up trying to get
- * the minimum number of workers before a superstep (10 minutes).
- */
- int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT = 10 * 60 * 1000;
+ IntConfOption MAX_MASTER_SUPERSTEP_WAIT_MSECS =
+ new IntConfOption("giraph.maxMasterSuperstepWaitMsecs",
+ MINUTES.toMillis(10));
/** Milliseconds for a request to complete (or else resend) */
- String MAX_REQUEST_MILLISECONDS = "giraph.maxRequestMilliseconds";
- /** Maximum number of milliseconds for a request to complete (10 minutes) */
- int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
+ IntConfOption MAX_REQUEST_MILLISECONDS =
+ new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10));
/** Netty max connection failures */
- String NETTY_MAX_CONNECTION_FAILURES = "giraph.nettyMaxConnectionFailures";
- /** Default Netty max connection failures */
- int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+ IntConfOption NETTY_MAX_CONNECTION_FAILURES =
+ new IntConfOption("giraph.nettyMaxConnectionFailures", 1000);
/** Initial port to start using for the IPC communication */
- String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
- /** Default port to start using for the IPC communication */
- int IPC_INITIAL_PORT_DEFAULT = 30000;
+ IntConfOption IPC_INITIAL_PORT =
+ new IntConfOption("giraph.ipcInitialPort", 30000);
/** Maximum bind attempts for different IPC ports */
- String MAX_IPC_PORT_BIND_ATTEMPTS = "giraph.maxIpcPortBindAttempts";
- /** Default maximum bind attempts for different IPC ports */
- int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+ IntConfOption MAX_IPC_PORT_BIND_ATTEMPTS =
+ new IntConfOption("giraph.maxIpcPortBindAttempts", 20);
/**
* Fail first IPC port binding attempt, simulate binding failure
* on real grid testing
*/
- String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
- "giraph.failFirstIpcPortBindAttempt";
- /** Default fail first IPC port binding attempt flag */
- boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
+ BooleanConfOption FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+ new BooleanConfOption("giraph.failFirstIpcPortBindAttempt", false);
/** Client send buffer size */
- String CLIENT_SEND_BUFFER_SIZE = "giraph.clientSendBufferSize";
- /** Default client send buffer size of 0.5 MB */
- int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+ IntConfOption CLIENT_SEND_BUFFER_SIZE =
+ new IntConfOption("giraph.clientSendBufferSize", 512 * ONE_KB);
/** Client receive buffer size */
- String CLIENT_RECEIVE_BUFFER_SIZE = "giraph.clientReceiveBufferSize";
- /** Default client receive buffer size of 32 k */
- int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+ IntConfOption CLIENT_RECEIVE_BUFFER_SIZE =
+ new IntConfOption("giraph.clientReceiveBufferSize", 32 * ONE_KB);
/** Server send buffer size */
- String SERVER_SEND_BUFFER_SIZE = "giraph.serverSendBufferSize";
- /** Default server send buffer size of 32 k */
- int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+ IntConfOption SERVER_SEND_BUFFER_SIZE =
+ new IntConfOption("giraph.serverSendBufferSize", 32 * ONE_KB);
/** Server receive buffer size */
- String SERVER_RECEIVE_BUFFER_SIZE = "giraph.serverReceiveBufferSize";
- /** Default server receive buffer size of 0.5 MB */
- int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+ IntConfOption SERVER_RECEIVE_BUFFER_SIZE =
+ new IntConfOption("giraph.serverReceiveBufferSize", 512 * ONE_KB);
/** Maximum size of messages (in bytes) per peer before flush */
- String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
- /** Default maximum size of messages per peer before flush of 0.5MB */
- int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
+ IntConfOption MAX_MSG_REQUEST_SIZE =
+ new IntConfOption("giraph.msgRequestSize", 512 * ONE_KB);
/**
* How much bigger than the average per partition size to make initial per
@@ -368,34 +370,23 @@ public interface GiraphConstants {
* and a worker has P partitions, than its initial partition buffer size
* will be (M / P) * (1 + A).
*/
- String ADDITIONAL_MSG_REQUEST_SIZE =
- "giraph.additionalMsgRequestSize";
- /**
- * Default factor for how bigger should initial per partition buffers be
- * of 20%.
- */
- float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+ FloatConfOption ADDITIONAL_MSG_REQUEST_SIZE =
+ new FloatConfOption("giraph.additionalMsgRequestSize", 0.2f);
/** Maximum size of edges (in bytes) per peer before flush */
- String MAX_EDGE_REQUEST_SIZE = "giraph.edgeRequestSize";
- /** Default maximum size of edges per peer before flush of 0.5MB */
- int MAX_EDGE_REQUEST_SIZE_DEFAULT = 512 * 1024;
+ IntConfOption MAX_EDGE_REQUEST_SIZE =
+ new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB);
/**
* Additional size (expressed as a ratio) of each per-partition buffer on
* top of the average size.
*/
- String ADDITIONAL_EDGE_REQUEST_SIZE =
- "giraph.additionalEdgeRequestSize";
- /**
- * Default additional per-partition buffer size.
- */
- float ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT = 0.2f;
+ FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
+ new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f);
/** Maximum number of mutations per partition before flush */
- String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
- /** Default maximum number of mutations per partition before flush */
- int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
+ IntConfOption MAX_MUTATIONS_PER_REQUEST =
+ new IntConfOption("giraph.maxMutationsPerRequest", 100);
/**
* Whether we should reuse the same Edge object when adding edges from
@@ -403,59 +394,38 @@ public interface GiraphConstants {
* This works with edge storage implementations that don't keep references
* to the input Edge objects (e.g., ByteArrayVertex).
*/
- String REUSE_INCOMING_EDGE_OBJECTS = "giraph.reuseIncomingEdgeObjects";
- /**
- * Default is to not reuse edge objects (since it's not compatible with
- * all storage implementations).
- */
- boolean REUSE_INCOMING_EDGE_OBJECTS_DEFAULT = false;
+ BooleanConfOption REUSE_INCOMING_EDGE_OBJECTS =
+ new BooleanConfOption("giraph.reuseIncomingEdgeObjects", false);
/**
* Use message size encoding (typically better for complex objects,
* not meant for primitive wrapped messages)
*/
- String USE_MESSAGE_SIZE_ENCODING = "giraph.useMessageSizeEncoding";
- /**
- * By default, do not use message size encoding as it is experimental.
- */
- boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+ BooleanConfOption USE_MESSAGE_SIZE_ENCODING =
+ new BooleanConfOption("giraph.useMessageSizeEncoding", false);
/** Number of channels used per server */
- String CHANNELS_PER_SERVER = "giraph.channelsPerServer";
- /** Default number of channels used per server of 1 */
- int DEFAULT_CHANNELS_PER_SERVER = 1;
+ IntConfOption CHANNELS_PER_SERVER =
+ new IntConfOption("giraph.channelsPerServer", 1);
/** Number of flush threads per peer */
String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
/** Number of threads for vertex computation */
- String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
- /** Default number of threads for vertex computation */
- int NUM_COMPUTE_THREADS_DEFAULT = 1;
+ IntConfOption NUM_COMPUTE_THREADS =
+ new IntConfOption("giraph.numComputeThreads", 1);
/** Number of threads for input splits loading */
- String NUM_INPUT_SPLITS_THREADS = "giraph.numInputSplitsThreads";
- /** Default number of threads for input splits loading */
- int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+ IntConfOption NUM_INPUT_SPLITS_THREADS =
+ new IntConfOption("giraph.numInputSplitsThreads", 1);
/** Minimum stragglers of the superstep before printing them out */
- String PARTITION_LONG_TAIL_MIN_PRINT = "giraph.partitionLongTailMinPrint";
- /** Only print stragglers with one as a default */
- int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
+ IntConfOption PARTITION_LONG_TAIL_MIN_PRINT =
+ new IntConfOption("giraph.partitionLongTailMinPrint", 1);
/** Use superstep counters? (boolean) */
- String USE_SUPERSTEP_COUNTERS = "giraph.useSuperstepCounters";
- /** Default is to use the superstep counters */
- boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
- /**
- * Set the multiplicative factor of how many partitions to create from
- * a single InputSplit based on the number of total InputSplits. For
- * example, if there are 10 total InputSplits and this is set to 0.5, then
- * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
- * minimum size is met).
- */
- String TOTAL_INPUT_SPLIT_MULTIPLIER = "giraph.totalInputSplitMultiplier";
+ BooleanConfOption USE_SUPERSTEP_COUNTERS =
+ new BooleanConfOption("giraph.useSuperstepCounters", true);
/**
* Input split sample percent - Used only for sampling and testing, rather
@@ -463,31 +433,24 @@ public interface GiraphConstants {
* fraction of the actual input splits from your VertexInputFormat to
* load (values should be [0, 100]).
*/
- String INPUT_SPLIT_SAMPLE_PERCENT = "giraph.inputSplitSamplePercent";
- /** Default is to use all the input splits */
- float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
+ FloatConfOption INPUT_SPLIT_SAMPLE_PERCENT =
+ new FloatConfOption("giraph.inputSplitSamplePercent", 100f);
/**
* To limit outlier vertex input splits from producing too many vertices or
* to help with testing, the number of vertices loaded from an input split
* can be limited. By default, everything is loaded.
*/
- String INPUT_SPLIT_MAX_VERTICES = "giraph.InputSplitMaxVertices";
- /**
- * Default is that all the vertices are to be loaded from the input split
- */
- long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+ LongConfOption INPUT_SPLIT_MAX_VERTICES =
+ new LongConfOption("giraph.InputSplitMaxVertices", -1);
/**
* To limit outlier vertex input splits from producing too many vertices or
* to help with testing, the number of edges loaded from an input split
* can be limited. By default, everything is loaded.
*/
- String INPUT_SPLIT_MAX_EDGES = "giraph.InputSplitMaxEdges";
- /**
- * Default is that all the edges are to be loaded from the input split
- */
- long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+ LongConfOption INPUT_SPLIT_MAX_EDGES =
+ new LongConfOption("giraph.InputSplitMaxEdges", -1);
/**
* To minimize network usage when reading input splits,
@@ -496,24 +459,16 @@ public interface GiraphConstants {
* Hence, users with a lot of splits and input threads (or with
* configurations that can't exploit locality) may want to disable it.
*/
- String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality";
-
- /**
- * Default is to prioritize local input splits.
- */
- boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+ BooleanConfOption USE_INPUT_SPLIT_LOCALITY =
+ new BooleanConfOption("giraph.useInputSplitLocality", true);
/** Multiplier for the current workers squared */
- String PARTITION_COUNT_MULTIPLIER =
- "partition.masterPartitionCountMultipler";
- /** Default mulitplier for current workers squared */
- float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+ FloatConfOption PARTITION_COUNT_MULTIPLIER =
+ new FloatConfOption("partition.masterPartitionCountMultipler", 1.0f);
/** Overrides default partition count calculation if not -1 */
- String USER_PARTITION_COUNT =
- "partition.userPartitionCount";
- /** Default user partition count */
- int DEFAULT_USER_PARTITION_COUNT = -1;
+ IntConfOption USER_PARTITION_COUNT =
+ new IntConfOption("partition.userPartitionCount", -1);
/** Vertex key space size for
* {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
@@ -521,28 +476,23 @@ public interface GiraphConstants {
String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize";
/** Java opts passed to ZooKeeper startup */
- String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
- /** Default java opts passed to ZooKeeper startup */
- String ZOOKEEPER_JAVA_OPTS_DEFAULT =
- "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
- "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
+ StrConfOption ZOOKEEPER_JAVA_OPTS =
+ new StrConfOption("giraph.zkJavaOpts",
+ "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+ "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100");
/**
* How often to checkpoint (i.e. 0, means no checkpoint,
* 1 means every superstep, 2 is every two supersteps, etc.).
*/
- String CHECKPOINT_FREQUENCY = "giraph.checkpointFrequency";
-
- /** Default checkpointing frequency of none. */
- int CHECKPOINT_FREQUENCY_DEFAULT = 0;
+ IntConfOption CHECKPOINT_FREQUENCY =
+ new IntConfOption("giraph.checkpointFrequency", 0);
/**
* Delete checkpoints after a successful job run?
*/
- String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
- "giraph.cleanupCheckpointsAfterSuccess";
- /** Default is to clean up the checkponts after a successful job */
- boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = true;
+ BooleanConfOption CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
+ new BooleanConfOption("giraph.cleanupCheckpointsAfterSuccess", true);
/**
* An application can be restarted manually by selecting a superstep. The
@@ -561,76 +511,56 @@ public interface GiraphConstants {
* If ZOOKEEPER_LIST is not set, then use this directory to manage
* ZooKeeper
*/
- String ZOOKEEPER_MANAGER_DIRECTORY = "giraph.zkManagerDirectory";
- /**
- * Default ZooKeeper manager directory (where determining the servers in
- * HDFS files will go). directory path will also have job number
- * for uniqueness.
- */
- String ZOOKEEPER_MANAGER_DIR_DEFAULT = "_bsp/_defaultZkManagerDir";
+ StrConfOption ZOOKEEPER_MANAGER_DIRECTORY =
+ new StrConfOption("giraph.zkManagerDirectory",
+ "_bsp/_defaultZkManagerDir");
/** Number of ZooKeeper client connection attempts before giving up. */
- String ZOOKEEPER_CONNECTION_ATTEMPTS = "giraph.zkConnectionAttempts";
- /** Default of 10 ZooKeeper client connection attempts before giving up. */
- int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+ IntConfOption ZOOKEEPER_CONNECTION_ATTEMPTS =
+ new IntConfOption("giraph.zkConnectionAttempts", 10);
/** This directory has/stores the available checkpoint files in HDFS. */
- String CHECKPOINT_DIRECTORY = "giraph.checkpointDirectory";
- /**
- * Default checkpoint directory (where checkpoing files go in HDFS). Final
- * directory path will also have the job number for uniqueness
- */
- String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/";
+ StrConfOption CHECKPOINT_DIRECTORY =
+ new StrConfOption("giraph.checkpointDirectory", "_bsp/_checkpoints/");
/**
* Comma-separated list of directories in the local file system for
* out-of-core messages.
*/
- String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
- /**
- * Default messages directory. directory path will also have the
- * job number for uniqueness
- */
- String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
+ StrConfOption MESSAGES_DIRECTORY =
+ new StrConfOption("giraph.messagesDirectory", "_bsp/_messages/");
/** Whether or not to use out-of-core messages */
- String USE_OUT_OF_CORE_MESSAGES = "giraph.useOutOfCoreMessages";
- /** Default choice about using out-of-core messaging */
- boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
+ BooleanConfOption USE_OUT_OF_CORE_MESSAGES =
+ new BooleanConfOption("giraph.useOutOfCoreMessages", false);
/**
* If using out-of-core messaging, it tells how much messages do we keep
* in memory.
*/
- String MAX_MESSAGES_IN_MEMORY = "giraph.maxMessagesInMemory";
- /** Default maximum number of messages in memory. */
- int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
+ IntConfOption MAX_MESSAGES_IN_MEMORY =
+ new IntConfOption("giraph.maxMessagesInMemory", 1000000);
/** Size of buffer when reading and writing messages out-of-core. */
- String MESSAGES_BUFFER_SIZE = "giraph.messagesBufferSize";
- /** Default size of buffer when reading and writing messages out-of-core. */
- int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+ IntConfOption MESSAGES_BUFFER_SIZE =
+ new IntConfOption("giraph.messagesBufferSize", 8 * ONE_KB);
/**
* Comma-separated list of directories in the local filesystem for
* out-of-core partitions.
*/
- String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
- /** Default directory for out-of-core partitions. */
- String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+ StrConfOption PARTITIONS_DIRECTORY =
+ new StrConfOption("giraph.partitionsDirectory", "_bsp/_partitions");
/** Enable out-of-core graph. */
- String USE_OUT_OF_CORE_GRAPH = "giraph.useOutOfCoreGraph";
- /** Default is not to use out-of-core graph. */
- boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+ BooleanConfOption USE_OUT_OF_CORE_GRAPH =
+ new BooleanConfOption("giraph.useOutOfCoreGraph", false);
/** Maximum number of partitions to hold in memory for each worker. */
- String MAX_PARTITIONS_IN_MEMORY = "giraph.maxPartitionsInMemory";
- /** Default maximum number of in-memory partitions. */
- int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+ IntConfOption MAX_PARTITIONS_IN_MEMORY =
+ new IntConfOption("giraph.maxPartitionsInMemory", 10);
/** Keep the zookeeper output for debugging? Default is to remove it. */
- String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData";
- /** Default is to remove ZooKeeper data. */
- Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
+ BooleanConfOption KEEP_ZOOKEEPER_DATA =
+ new BooleanConfOption("giraph.keepZooKeeperData", false);
/** Default ZooKeeper tick time. */
int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
@@ -643,57 +573,50 @@ public interface GiraphConstants {
/** Default ZooKeeper maximum client connections. */
int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
/** ZooKeeper minimum session timeout */
- String ZOOKEEPER_MIN_SESSION_TIMEOUT = "giraph.zKMinSessionTimeout";
- /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
- int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+ IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10));
/** ZooKeeper maximum session timeout */
- String ZOOKEEPER_MAX_SESSION_TIMEOUT = "giraph.zkMaxSessionTimeout";
- /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
- int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+ IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT =
+ new IntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15));
/** ZooKeeper force sync */
- String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
- /** Default ZooKeeper force sync is off (for performance) */
- String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+ StrConfOption ZOOKEEPER_FORCE_SYNC =
+ new StrConfOption("giraph.zKForceSync", "no");
/** ZooKeeper skip ACLs */
- String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
- /** Default ZooKeeper skip ACLs true (for performance) */
- String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
+ StrConfOption ZOOKEEPER_SKIP_ACL =
+ new StrConfOption("giraph.ZkSkipAcl", "yes");
/**
* Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
* and authorize Netty BSP Clients to Servers.
*/
- String AUTHENTICATE = "giraph.authenticate";
- /** Default is not to do authenticate and authorization with Netty. */
- boolean DEFAULT_AUTHENTICATE = false;
+ BooleanConfOption AUTHENTICATE =
+ new BooleanConfOption("giraph.authenticate", false);
/** Use unsafe serialization? */
- String USE_UNSAFE_SERIALIZATION = "giraph.useUnsafeSerialization";
- /**
- * Use unsafe serialization default is true (use it if you can,
- * its much faster)!
- */
- boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+ BooleanConfOption USE_UNSAFE_SERIALIZATION =
+ new BooleanConfOption("giraph.useUnsafeSerialization", true);
/**
* Maximum number of attempts a master/worker will retry before killing
* the job. This directly maps to the number of map task attempts in
* Hadoop.
*/
- String MAX_TASK_ATTEMPTS = "mapred.map.max.attempts";
+ IntConfOption MAX_TASK_ATTEMPTS =
+ new IntConfOption("mapred.map.max.attempts", -1);
/** Interface to use for hostname resolution */
- String DNS_INTERFACE = "giraph.dns.interface";
+ StrConfOption DNS_INTERFACE =
+ new StrConfOption("giraph.dns.interface", "default");
/** Server for hostname resolution */
- String DNS_NAMESERVER = "giraph.dns.nameserver";
+ StrConfOption DNS_NAMESERVER =
+ new StrConfOption("giraph.dns.nameserver", "default");
/**
* The application will halt after this many supersteps is completed. For
* instance, if it is set to 3, the application will run at most 0, 1,
* and 2 supersteps and then go into the shutdown superstep.
*/
- String MAX_NUMBER_OF_SUPERSTEPS = "giraph.maxNumberOfSupersteps";
- /** By default, the number of supersteps is not limited */
- int MAX_NUMBER_OF_SUPERSTEPS_DEFAULT = -1;
+ IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
+ new IntConfOption("giraph.maxNumberOfSupersteps", 1);
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index de85ab6..0af8b97 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
+import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
+
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
@@ -87,8 +89,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
public ImmutableClassesGiraphConfiguration(Configuration conf) {
super(conf);
classes = new GiraphClasses(conf);
- useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
- USE_UNSAFE_SERIALIZATION_DEFAULT);
+ useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
try {
vertexValueFactory = (VertexValueFactory<V>)
classes.getVertexValueFactoryClass().newInstance();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
new file mode 100644
index 0000000..de75e9d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Integer configuration option
+ */
+public class IntConfOption extends AbstractConfOption {
+ /** Default value */
+ private final int defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public IntConfOption(String key, int defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public IntConfOption(String key, long defaultValue) {
+ super(key);
+ this.defaultValue = (int) defaultValue;
+ AllOptions.add(this);
+ }
+
+ public int getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Integer.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.INTEGER;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or default value if not set
+ */
+ public int get(Configuration conf) {
+ return conf.getInt(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, int value) {
+ conf.setInt(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, int value) {
+ if (conf.get(getKey()) == null) {
+ conf.setInt(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
new file mode 100644
index 0000000..0cbc164
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Long configuration option
+ */
+public class LongConfOption extends AbstractConfOption {
+ /** Default value */
+ private long defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public LongConfOption(String key, long defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public long getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Long.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.LONG;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value set for key, or defaultValue
+ */
+ public long get(Configuration conf) {
+ return conf.getLong(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, long value) {
+ conf.setLong(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, long value) {
+ if (conf.get(getKey()) == null) {
+ conf.setLong(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
new file mode 100644
index 0000000..83a583d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * String Configuration option
+ */
+public class StrConfOption extends AbstractConfOption {
+ /** Default value */
+ private final String defaultValue;
+
+ /**
+ * Constructor
+ * @param key key
+ * @param defaultValue default value
+ */
+ public StrConfOption(String key, String defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return defaultValue;
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.STRING;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or defaultValue
+ */
+ public String get(Configuration conf) {
+ return conf.get(getKey(), defaultValue);
+ }
+
+ /**
+ * Lookup value with user defined defaultValue
+ * @param conf Configuration
+ * @param defaultVal default value to use
+ * @return value for key, or defaultVal passed in
+ */
+ public String getWithDefault(Configuration conf, String defaultVal) {
+ return conf.get(getKey(), defaultVal);
+ }
+
+ /**
+ * Get array of values for key
+ * @param conf Configuration
+ * @return array of values for key
+ */
+ public String[] getArray(Configuration conf) {
+ return conf.getStrings(getKey(), defaultValue);
+ }
+
+ /**
+ * Get list of values for key
+ * @param conf Configuration
+ * @return list of values for key
+ */
+ public List<String> getList(Configuration conf) {
+ return Lists.newArrayList(getArray(conf));
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, String value) {
+ conf.set(getKey(), value);
+ }
+
+ /**
+ * Set value if not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, String value) {
+ conf.setIfUnset(getKey(), value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e74c59a..57f7dff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,8 +18,6 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -57,6 +55,9 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
@@ -72,6 +73,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+
/**
* The Giraph-specific business logic for a single BSP
* compute node in whatever underlying type of cluster
@@ -440,18 +446,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
Type vertexValueType = classList.get(1);
Type edgeValueType = classList.get(2);
Type messageValueType = classList.get(3);
- conf.setClass(GiraphConstants.VERTEX_ID_CLASS,
- (Class<?>) vertexIndexType,
- WritableComparable.class);
- conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType,
- Writable.class);
- conf.setClass(GiraphConstants.EDGE_VALUE_CLASS,
- (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType,
- Writable.class);
+ VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
+ VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
+ EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
+ MESSAGE_VALUE_CLASS.set(conf, (Class<Writable>) messageValueType);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 1e05773..f55cf18 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -21,7 +21,6 @@ package org.apache.giraph.job;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
@@ -40,6 +39,9 @@ import org.apache.log4j.Logger;
import java.lang.reflect.Type;
import java.util.List;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+
/**
* GiraphConfigurationValidator attempts to verify the consistency of
* user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -138,34 +140,36 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
conf.getMinPercentResponded() > 100.0f) {
throw new IllegalArgumentException(
"checkConfiguration: Invalid " + conf.getMinPercentResponded() +
- " for " + GiraphConstants.MIN_PERCENT_RESPONDED);
+ " for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
}
if (conf.getMinWorkers() < 0) {
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
if (conf.getVertexClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: Null" +
- GiraphConstants.VERTEX_CLASS);
+ throw new IllegalArgumentException("checkConfiguration: Null " +
+ GiraphConstants.VERTEX_CLASS.getKey());
}
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
throw new IllegalArgumentException("checkConfiguration: One of " +
- GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
- GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
+ GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
+ " must be non-null");
}
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
- GiraphConstants.VERTEX_RESOLVER_CLASS + ", defaulting to " +
- DefaultVertexResolver.class.getCanonicalName());
+ VERTEX_RESOLVER_CLASS.getKey() +
+ ", defaulting to " +
+ VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
}
}
if (conf.getVertexEdgesClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
- GiraphConstants.VERTEX_EDGES_CLASS + ", defaulting to " +
- ByteArrayEdges.class.getCanonicalName());
+ VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
+ VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index da85d1c..6849ca3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -214,7 +214,7 @@ public class GiraphJob {
if (LOG.isInfoEnabled()) {
LOG.info("run: Since checkpointing is disabled (default), " +
"do not allow any task retries (setting " +
- GiraphConstants.MAX_TASK_ATTEMPTS + " = 0, " +
+ GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 0, " +
"old value = " + oldMaxTaskAttempts + ")");
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 6c979d6..404e47e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -104,6 +104,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
+
/**
* ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
*
@@ -201,13 +206,10 @@ public class BspServiceMaster<I extends WritableComparable,
maxWorkers = conf.getMaxWorkers();
minWorkers = conf.getMinWorkers();
maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
- minPercentResponded = conf.getFloat(
- GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
+ minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
eventWaitMsecs = conf.getEventWaitMsecs();
maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
- partitionLongTailMinPrint = conf.getInt(
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT,
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+ partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
masterGraphPartitioner =
getGraphPartitionerFactory().createMasterGraphPartitioner();
if (conf.isJMapHistogramDumpEnabled()) {
@@ -316,11 +318,8 @@ public class BspServiceMaster<I extends WritableComparable,
logPrefix + ": Got InterruptedException", e);
}
float samplePercent =
- getConfiguration().getFloat(
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent !=
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+ INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
+ if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
int lastIndex = (int) (samplePercent * splits.size() / 100f);
List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
LOG.warn(logPrefix + ": Using sampling - Processing only " +
@@ -579,6 +578,7 @@ public class BspServiceMaster<I extends WritableComparable,
private int createInputSplits(GiraphInputFormat inputFormat,
InputSplitPaths inputSplitPaths,
String inputSplitType) {
+ ImmutableClassesGiraphConfiguration conf = getConfiguration();
String logPrefix = "create" + inputSplitType + "InputSplits";
// Only the 'master' should be doing this. Wait until the number of
// processes that have reported health exceeds the minimum percentage.
@@ -612,7 +612,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Create at least as many splits as the total number of input threads.
int minSplitCountHint = healthyWorkerInfoList.size() *
- getConfiguration().getNumInputSplitsThreads();
+ conf.getNumInputSplitsThreads();
// Note that the input splits may only be a sample if
// INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
@@ -635,8 +635,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
// Write input splits to zookeeper in parallel
- int inputSplitThreadCount = getConfiguration().getInt(
- INPUT_SPLIT_THREAD_COUNT,
+ int inputSplitThreadCount = conf.getInt(INPUT_SPLIT_THREAD_COUNT,
DEFAULT_INPUT_SPLIT_THREAD_COUNT);
if (LOG.isInfoEnabled()) {
LOG.info(logPrefix + ": Starting to write input split data " +
@@ -644,9 +643,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
ExecutorService taskExecutor =
Executors.newFixedThreadPool(inputSplitThreadCount);
- boolean writeLocations = getConfiguration().getBoolean(
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+ boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
@@ -1356,9 +1353,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private void cleanUpOldSuperstep(long removeableSuperstep) throws
InterruptedException {
- if (!(getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+ if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
(removeableSuperstep >= 0)) {
String oldSuperstepPath =
getSuperstepPath(getApplicationAttempt()) + "/" +
@@ -1523,7 +1518,7 @@ public class BspServiceMaster<I extends WritableComparable,
// If we have completed the maximum number of supersteps, stop
// the computation
if (maxNumberOfSupersteps !=
- GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS_DEFAULT &&
+ GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
(getSuperstep() == maxNumberOfSupersteps - 1)) {
if (LOG.isInfoEnabled()) {
LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
@@ -1661,9 +1656,7 @@ public class BspServiceMaster<I extends WritableComparable,
// provided (not dynamically started) and we don't want to keep the data
try {
if (getConfiguration().getZookeeperList() != null &&
- !getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+ KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanupZooKeeper: Removing the following path " +
"and all children - " + basePath + " from ZooKeeper list " +
@@ -1713,6 +1706,8 @@ public class BspServiceMaster<I extends WritableComparable,
@Override
public void cleanup() throws IOException {
+ ImmutableClassesGiraphConfiguration conf = getConfiguration();
+
// All master processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
// for workers and masters, the master will clean up the ZooKeeper
@@ -1744,9 +1739,7 @@ public class BspServiceMaster<I extends WritableComparable,
if (isMaster) {
cleanUpZooKeeper();
// If desired, cleanup the checkpoint directory
- if (getConfiguration().getBoolean(
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+ if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
boolean success =
getFs().delete(new Path(checkpointBasePath), true);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 40c6b74..ba2f8eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -19,11 +19,10 @@
package org.apache.giraph.master;
import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.bsp.BspService;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -34,6 +33,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
+
/**
* Master thread that will coordinate the activities of the tasks. It runs
* on all task processes, however, will only execute its algorithm if it knows
@@ -76,9 +77,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
this.bspServiceMaster = bspServiceMaster;
this.context = context;
GiraphTimers.init(context);
- superstepCounterOn = context.getConfiguration().getBoolean(
- GiraphConstants.USE_SUPERSTEP_COUNTERS,
- GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
+ superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 6bc9591..3525302 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,12 +18,6 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
@@ -32,6 +26,12 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -55,6 +55,9 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
+
/**
* Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
* Thread-safe, but expects the caller to synchronized between deletes, adds,
@@ -128,14 +131,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
this.conf = conf;
this.context = context;
// We must be able to hold at least one partition in memory
- maxInMemoryPartitions = Math.max(1,
- conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
- GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+ maxInMemoryPartitions = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1);
// Take advantage of multiple disks
- String[] userPaths = conf.getStrings(
- GiraphConstants.PARTITIONS_DIRECTORY,
- GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+ String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
basePaths = new String[userPaths.length];
int i = 0;
for (String path : userPaths) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index c83ca45..fc75006 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -18,14 +18,15 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -34,6 +35,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
+
/**
* Helper class for {@link Partition} related operations.
*/
@@ -166,13 +169,10 @@ public class PartitionUtils {
"computePartitionCount: No available workers");
}
- int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT,
- GiraphConstants.DEFAULT_USER_PARTITION_COUNT);
+ int userPartitionCount = USER_PARTITION_COUNT.get(conf);
int partitionCount;
- if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) {
- float multiplier = conf.getFloat(
- GiraphConstants.PARTITION_COUNT_MULTIPLIER,
- GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER);
+ if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
+ float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
partitionCount =
Math.max((int) (multiplier * availableWorkerInfos.size() *
availableWorkerInfos.size()),
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index ae8556f..23e0f05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,7 +18,6 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -33,6 +32,8 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+
/**
* A simple map-based container that stores vertices. Vertex ids will map to
* exactly one partition.
@@ -57,8 +58,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void initialize(int partitionId, Progressable progressable) {
super.initialize(partitionId, progressable);
- if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
@@ -114,8 +114,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
- if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 2bba672..4b03127 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -151,15 +151,15 @@ public class InternalVertexRunner {
}
conf.setWorkerConfiguration(1, 1, 100.0f);
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
- conf.setBoolean(GiraphConstants.LOCAL_TEST_MODE, true);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+ GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
String.valueOf(LOCAL_ZOOKEEPER_PORT));
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
- conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkMgrDir.toString());
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
for (Map.Entry<String, String> param : params.entrySet()) {
conf.set(param.getKey(), param.getValue());
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index 463510f..aedee6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -98,10 +98,8 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
@Override
public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- sleepMillis = configuration.getInt(GiraphConstants.JMAP_SLEEP_MILLIS,
- GiraphConstants.JMAP_SLEEP_MILLIS_DEFAULT);
- linesToPrint = configuration.getInt(GiraphConstants.JMAP_PRINT_LINES,
- GiraphConstants.JMAP_PRINT_LINES_DEFAULT);
+ sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
+ linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
index 7589a09..19f6be5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/GiraphZooKeeperAdmin.java
@@ -18,20 +18,22 @@
package org.apache.giraph.zk;
-import static java.lang.System.out;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.lang.System.out;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
/**
* A Utility class to be used by Giraph admins to occasionally clean up the
@@ -74,15 +76,13 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
*/
@Override
public int run(String[] args) {
- final int zkPort = getConf().getInt(
- GiraphConfiguration.ZOOKEEPER_SERVER_PORT,
- GiraphConfiguration.ZOOKEEPER_SERVER_PORT_DEFAULT);
+ final int zkPort = ZOOKEEPER_SERVER_PORT.get(getConf());
final String zkBasePath = getConf().get(
- GiraphConfiguration.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
+ GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
final String[] zkServerList;
try {
zkServerList = getConf()
- .get(GiraphConfiguration.ZOOKEEPER_LIST).split(",");
+ .get(GiraphConstants.ZOOKEEPER_LIST).split(",");
} catch (NullPointerException npe) {
throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
"of ZooKeeper servers to clean.");
@@ -94,9 +94,9 @@ public class GiraphZooKeeperAdmin implements Watcher, Tool {
try {
ZooKeeperExt zooKeeper = new ZooKeeperExt(
formatZkServerList(zkServerList, zkPort),
- GiraphConfiguration.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT,
- GiraphConfiguration.ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT,
- GiraphConfiguration.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT,
+ GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
+ GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
+ GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
this);
doZooKeeperCleanup(zooKeeper, zkBasePath);
return 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index add57fc..82a7fc1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
+import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
/**
@@ -141,7 +142,7 @@ public class ZooKeeperManager {
taskPartition = conf.getTaskPartition();
jobId = conf.get("mapred.job.id", "Unknown Job");
baseDirectory =
- new Path(conf.get(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
getFinalZooKeeperPath()));
taskDirectory = new Path(baseDirectory,
"_task");
@@ -150,25 +151,19 @@ public class ZooKeeperManager {
myClosedPath = new Path(taskDirectory,
Integer.toString(taskPartition) +
COMPUTATION_DONE_SUFFIX);
- pollMsecs = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS,
- GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
- serverCount = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVER_COUNT,
- GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
+ serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
String jobLocalDir = conf.get("job.local.dir");
if (jobLocalDir != null) { // for non-local jobs
zkDirDefault = jobLocalDir +
"/_bspZooKeeper";
} else {
zkDirDefault = System.getProperty("user.dir") + "/" +
- GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT;
+ ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
}
zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
configFilePath = zkDir + "/zoo.cfg";
- zkBasePort = conf.getInt(
- GiraphConstants.ZOOKEEPER_SERVER_PORT,
- GiraphConstants.ZOOKEEPER_SERVER_PORT_DEFAULT);
+ zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
myHostname = conf.getLocalHostname();
fs = FileSystem.get(conf);
@@ -180,7 +175,7 @@ public class ZooKeeperManager {
* @return directory path with job id
*/
private String getFinalZooKeeperPath() {
- return GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
+ return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
}
/**
@@ -305,7 +300,7 @@ public class ZooKeeperManager {
"for base directory " + baseDirectory + ". If there is an " +
"issue with this directory, please set an accesible " +
"base directory with the Hadoop configuration option " +
- GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY);
+ ZOOKEEPER_MANAGER_DIRECTORY.getKey());
}
Path myCandidacyPath = new Path(
@@ -627,9 +622,7 @@ public class ZooKeeperManager {
"onlineZooKeeperServers: java.home is not set!");
}
commandList.add(javaHome + "/bin/java");
- String zkJavaOptsString =
- conf.get(GiraphConstants.ZOOKEEPER_JAVA_OPTS,
- GiraphConstants.ZOOKEEPER_JAVA_OPTS_DEFAULT);
+ String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf);
String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
if (zkJavaOptsArray != null) {
commandList.addAll(Arrays.asList(zkJavaOptsArray));
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index a6eef20..4c74d3f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -98,11 +98,11 @@ public class BspCase implements Watcher {
"location " + getJarLocation() + " for " + getName());
conf.setWorkerConfiguration(1, 1, 100.0f);
// Single node testing
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
}
conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
conf.setEventWaitMsecs(3 * 1000);
- conf.setInt(GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+ GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
if (getZooKeeperList() != null) {
conf.setZooKeeperConfiguration(getZooKeeperList());
}
@@ -120,9 +120,9 @@ public class BspCase implements Watcher {
FileUtils.deletePath(conf, checkPointDir);
conf.set(GiraphConstants.ZOOKEEPER_DIR, zookeeperDir.toString());
- conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkManagerDir.toString());
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkPointDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkPointDir.toString());
return conf;
}
[3/3] git commit: GIRAPH-587: Refactor configuration options (nitay)
Posted by ni...@apache.org.
GIRAPH-587: Refactor configuration options (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/01c527e2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/01c527e2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/01c527e2
Branch: refs/heads/trunk
Commit: 01c527e225336d68a30e447889b37b6187f6286c
Parents: 460198a
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Mar 27 20:01:56 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Mar 27 20:02:15 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
checkstyle.xml | 1 +
.../apache/giraph/benchmark/PageRankBenchmark.java | 2 +-
.../giraph/benchmark/ShortestPathsBenchmark.java | 2 +-
.../java/org/apache/giraph/bsp/BspInputFormat.java | 8 +-
.../java/org/apache/giraph/bsp/BspService.java | 8 +-
.../java/org/apache/giraph/comm/SendEdgeCache.java | 12 +-
.../org/apache/giraph/comm/SendMessageCache.java | 12 +-
.../java/org/apache/giraph/comm/ServerData.java | 17 +-
.../comm/messages/SequentialFileMessageStore.java | 10 +-
.../org/apache/giraph/comm/netty/NettyClient.java | 54 +-
.../org/apache/giraph/comm/netty/NettyServer.java | 38 +-
.../netty/NettyWorkerClientRequestProcessor.java | 42 +-
.../giraph/comm/netty/NettyWorkerServer.java | 12 +-
.../giraph/comm/netty/handler/RequestEncoder.java | 10 +-
.../comm/netty/handler/RequestServerHandler.java | 7 +-
.../comm/netty/handler/ResponseClientHandler.java | 7 +-
.../comm/netty/handler/SaslServerHandler.java | 7 +-
.../org/apache/giraph/conf/AbstractConfOption.java | 92 +++
.../java/org/apache/giraph/conf/AllOptions.java | 82 ++
.../org/apache/giraph/conf/BooleanConfOption.java | 96 +++
.../org/apache/giraph/conf/ClassConfOption.java | 182 +++++
.../org/apache/giraph/conf/ConfOptionType.java | 36 +
.../org/apache/giraph/conf/FloatConfOption.java | 80 ++
.../java/org/apache/giraph/conf/GiraphClasses.java | 46 +-
.../apache/giraph/conf/GiraphConfiguration.java | 211 ++----
.../org/apache/giraph/conf/GiraphConstants.java | 623 +++++++--------
.../conf/ImmutableClassesGiraphConfiguration.java | 5 +-
.../java/org/apache/giraph/conf/IntConfOption.java | 91 +++
.../org/apache/giraph/conf/LongConfOption.java | 80 ++
.../java/org/apache/giraph/conf/StrConfOption.java | 110 +++
.../org/apache/giraph/graph/GraphTaskManager.java | 26 +-
.../giraph/job/GiraphConfigurationValidator.java | 24 +-
.../main/java/org/apache/giraph/job/GiraphJob.java | 2 +-
.../org/apache/giraph/master/BspServiceMaster.java | 45 +-
.../org/apache/giraph/master/MasterThread.java | 9 +-
.../giraph/partition/DiskBackedPartitionStore.java | 23 +-
.../apache/giraph/partition/PartitionUtils.java | 16 +-
.../apache/giraph/partition/SimplePartition.java | 9 +-
.../apache/giraph/utils/InternalVertexRunner.java | 8 +-
.../org/apache/giraph/utils/JMapHistoDumper.java | 6 +-
.../org/apache/giraph/zk/GiraphZooKeeperAdmin.java | 32 +-
.../org/apache/giraph/zk/ZooKeeperManager.java | 25 +-
.../src/test/java/org/apache/giraph/BspCase.java | 8 +-
.../org/apache/giraph/comm/RequestFailureTest.java | 17 +-
.../java/org/apache/giraph/comm/RequestTest.java | 12 +-
.../org/apache/giraph/comm/SaslConnectionTest.java | 7 +-
.../giraph/conf/TestGiraphConfiguration.java | 7 +-
.../org/apache/giraph/conf/TestObjectCreation.java | 16 +-
.../apache/giraph/master/TestMasterObserver.java | 9 +-
.../giraph/partition/TestPartitionStores.java | 11 +-
.../java/org/apache/giraph/TestAutoCheckpoint.java | 8 +-
.../test/java/org/apache/giraph/TestBspBasic.java | 5 +-
.../org/apache/giraph/TestManualCheckpoint.java | 14 +-
.../org/apache/giraph/TestPartitionContext.java | 3 +-
.../aggregators/TestAggregatorsHandling.java | 28 +-
.../org/apache/giraph/examples/TestPageRank.java | 3 +-
.../examples/TryMultiIpcBindingPortsTest.java | 3 +-
.../org/apache/giraph/vertex/TestVertexTypes.java | 124 +--
59 files changed, 1535 insertions(+), 950 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 530d9ad..051c9a7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-587: Refactor configuration options (nitay)
+
GIRAPH-581: More flexible Hive output (majakabiljo)
GIRAPH-579: Make it possible to use different out-edges data structures
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle.xml b/checkstyle.xml
index 3d8a6d4..370c120 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -221,6 +221,7 @@
<!-- Lines cannot exceed 80 chars -->
<module name="LineLength">
<property name="max" value="80"/>
+ <property name="ignorePattern" value="^import"/>
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 2902fa9..b5d7e1e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -209,7 +209,7 @@ public class PageRankBenchmark implements Tool {
}
LOG.info("Using edges class " +
- configuration.get(GiraphConstants.VERTEX_EDGES_CLASS));
+ GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
if (!cmd.hasOption('t') ||
(Integer.parseInt(cmd.getOptionValue('t')) == 1)) {
configuration.setVertexCombinerClass(DoubleSumCombiner.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 1753f4f..c3c714e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -115,7 +115,7 @@ public class ShortestPathsBenchmark implements Tool {
job.getConfiguration().setVertexEdgesClass(HashMapEdges.class);
}
LOG.info("Using class " +
- job.getConfiguration().get(GiraphConstants.VERTEX_CLASS));
+ GiraphConstants.VERTEX_CLASS.get(job.getConfiguration()));
job.getConfiguration().setVertexInputFormatClass(
PseudoRandomVertexInputFormat.class);
if (!cmd.hasOption("nc")) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index bce84b1..cc53271 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -50,14 +50,10 @@ public class BspInputFormat extends InputFormat<Text, Text> {
*/
public static int getMaxTasks(Configuration conf) {
int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
- boolean splitMasterWorker =
- conf.getBoolean(GiraphConstants.SPLIT_MASTER_WORKER,
- GiraphConstants.SPLIT_MASTER_WORKER_DEFAULT);
+ boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
int maxTasks = maxWorkers;
if (splitMasterWorker) {
- int zkServers =
- conf.getInt(GiraphConstants.ZOOKEEPER_SERVER_COUNT,
- GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
maxTasks += zkServers;
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 969e2a5..187d111 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -52,6 +52,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+
/**
* Zookeeper-based implementation of {@link CentralizedService}.
*
@@ -311,9 +313,9 @@ public abstract class BspService<I extends WritableComparable,
EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
- checkpointBasePath = getConfiguration().get(
- GiraphConstants.CHECKPOINT_DIRECTORY,
- GiraphConstants.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
+ checkpointBasePath =
+ CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
+ CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index 679cf6f..fbc911f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
@@ -28,6 +27,9 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
+
/**
* Aggregates the edges to be sent to workers so they can be sent
* in bulk. Not thread-safe.
@@ -45,12 +47,8 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
*/
public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
- super(conf,
- serviceWorker,
- conf.getInt(GiraphConstants.MAX_EDGE_REQUEST_SIZE,
- GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT),
- conf.getFloat(GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE,
- GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT));
+ super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
+ ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 07dc380..7d2a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
@@ -27,6 +26,9 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+
/**
* Aggregates the messages to be sent to workers so they can be sent
* in bulk. Not thread-safe.
@@ -44,12 +46,8 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
*/
public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
- super(conf,
- serviceWorker,
- conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
- GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT),
- conf.getFloat(GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE,
- GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT));
+ super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
+ ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 70dc156..743a6f8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -84,13 +84,13 @@ public class ServerData<I extends WritableComparable,
* Constructor.
*
* @param service Service worker
- * @param configuration Configuration
+ * @param conf Configuration
* @param messageStoreFactory Factory for message stores
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
@@ -98,17 +98,16 @@ public class ServerData<I extends WritableComparable,
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
- if (configuration.getBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH,
- GiraphConstants.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+ if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
partitionStore =
- new DiskBackedPartitionStore<I, V, E, M>(configuration, context);
+ new DiskBackedPartitionStore<I, V, E, M>(conf, context);
} else {
partitionStore =
- new SimplePartitionStore<I, V, E, M>(configuration, context);
+ new SimplePartitionStore<I, V, E, M>(conf, context);
}
- edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
- ownerAggregatorData = new OwnerAggregatorServerData(context, configuration);
- allAggregatorData = new AllAggregatorServerData(context, configuration);
+ edgeStore = new EdgeStore<I, V, E, M>(service, conf, context);
+ ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
+ allAggregatorData = new AllAggregatorServerData(context, conf);
}
public EdgeStore<I, V, E, M> getEdgeStore() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index bdc5435..3fe4430 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+
/**
* Used for writing and reading collection of messages to the disk. {@link
* #addMessages(MessageStore<I, M>)} should be called only once with
@@ -377,9 +379,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
int taskId = config.getTaskPartition();
- List<String> userPaths = Lists.newArrayList(config.getStrings(
- GiraphConstants.MESSAGES_DIRECTORY,
- GiraphConstants.MESSAGES_DIRECTORY_DEFAULT));
+ List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
Collections.shuffle(userPaths);
directories = new String[userPaths.size()];
int i = 0;
@@ -389,9 +389,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
directories[i++] = directory;
new File(directory).mkdirs();
}
- this.bufferSize = config.getInt(
- GiraphConstants.MESSAGES_BUFFER_SIZE,
- GiraphConstants.MESSAGES_BUFFER_SIZE_DEFAULT);
+ this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
storeCounter = new AtomicInteger();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index af76410..30c32fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -70,6 +70,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
+import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
+import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
import static org.jboss.netty.channel.Channels.pipeline;
/**
@@ -175,15 +184,9 @@ public class NettyClient {
TaskInfo myTaskInfo) {
this.context = context;
this.myTaskInfo = myTaskInfo;
- this.channelsPerServer = conf.getInt(
- GiraphConstants.CHANNELS_PER_SERVER,
- GiraphConstants.DEFAULT_CHANNELS_PER_SERVER);
- sendBufferSize = conf.getInt(
- GiraphConstants.CLIENT_SEND_BUFFER_SIZE,
- GiraphConstants.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(
- GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE,
- GiraphConstants.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+ this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
+ sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
+ receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
limitNumberOfOpenRequests = conf.getBoolean(
LIMIT_NUMBER_OF_OPEN_REQUESTS,
@@ -200,39 +203,24 @@ public class NettyClient {
maxNumberOfOpenRequests = -1;
}
- maxRequestMilliseconds = conf.getInt(
- GiraphConstants.MAX_REQUEST_MILLISECONDS,
- GiraphConstants.MAX_REQUEST_MILLISECONDS_DEFAULT);
+ maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
- maxConnectionFailures = conf.getInt(
- GiraphConstants.NETTY_MAX_CONNECTION_FAILURES,
- GiraphConstants.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+ maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
- waitingRequestMsecs = conf.getInt(
- GiraphConstants.WAITING_REQUEST_MSECS,
- GiraphConstants.WAITING_REQUEST_MSECS_DEFAULT);
+ waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
- maxPoolSize = conf.getInt(
- GiraphConstants.NETTY_CLIENT_THREADS,
- GiraphConstants.NETTY_CLIENT_THREADS_DEFAULT);
+ maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
- maxResolveAddressAttempts = conf.getInt(
- GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS,
- GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+ maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
clientRequestIdRequestInfoMap =
new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
- handlerBeforeExecutionHandler = conf.get(
- GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER,
- GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT);
- boolean useExecutionHandler = conf.getBoolean(
- GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER,
- GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT);
+ handlerBeforeExecutionHandler =
+ NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
+ boolean useExecutionHandler = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
if (useExecutionHandler) {
- int executionThreads = conf.getInt(
- GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS,
- GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
+ int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
executionHandler = new ExecutionHandler(
new MemoryAwareThreadPoolExecutor(
executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index f31dd4a..0bfc2d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
import static org.jboss.netty.channel.Channels.pipeline;
/**
@@ -140,12 +141,8 @@ public class NettyServer {
this.saslServerHandlerFactory = new SaslServerHandler.Factory();
/*end[HADOOP_NON_SECURE]*/
this.myTaskInfo = myTaskInfo;
- sendBufferSize = conf.getInt(
- GiraphConstants.SERVER_SEND_BUFFER_SIZE,
- GiraphConstants.DEFAULT_SERVER_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(
- GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE,
- GiraphConstants.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+ sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
+ receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
workerRequestReservedMap = new WorkerRequestReservedMap(conf);
@@ -162,25 +159,21 @@ public class NettyServer {
throw new IllegalStateException("NettyServer: unable to get hostname");
}
- maxPoolSize = conf.getInt(
- GiraphConstants.NETTY_SERVER_THREADS,
- GiraphConstants.NETTY_SERVER_THREADS_DEFAULT);
+ maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
- tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG,
+ tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
conf.getInt(GiraphConstants.MAX_WORKERS,
- GiraphConstants.TCP_BACKLOG_DEFAULT));
+ GiraphConstants.TCP_BACKLOG.getDefaultValue()));
channelFactory = new NioServerSocketChannelFactory(
bossExecutorService,
workerExecutorService,
maxPoolSize);
- handlerBeforeExecutionHandler = conf.get(
- GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER,
- GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT);
- boolean useExecutionHandler = conf.getBoolean(
- GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER,
- GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ handlerBeforeExecutionHandler =
+ GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
+ boolean useExecutionHandler =
+ GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
if (useExecutionHandler) {
int executionThreads = conf.getNettyServerExecutionThreads();
executionHandler = new ExecutionHandler(
@@ -304,16 +297,11 @@ public class NettyServer {
int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
- int bindPort = conf.getInt(GiraphConstants.IPC_INITIAL_PORT,
- GiraphConstants.IPC_INITIAL_PORT_DEFAULT) +
- taskId;
+ int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
int bindAttempts = 0;
- final int maxIpcPortBindAttempts =
- conf.getInt(GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS,
- GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
final boolean failFirstPortBindingAttempt =
- conf.getBoolean(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT,
- GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT);
+ GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
// Simple handling of port collisions on the same machine while
// preserving debugability from the port number alone.
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index e58030e..db4ff5d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,9 +17,6 @@
*/
package org.apache.giraph.comm.netty;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.SendEdgeCache;
@@ -37,9 +34,9 @@ import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
@@ -49,16 +46,23 @@ import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
+
import java.io.IOException;
import java.util.Map;
+import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
+
/**
* Aggregate requests and sends them to the thread-safe NettyClient. This
* class is not thread-safe and expected to be used and then thrown away after
@@ -112,30 +116,22 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
* Constructor.
*
* @param context Context
- * @param configuration Configuration
+ * @param conf Configuration
* @param serviceWorker Service worker
*/
public NettyWorkerClientRequestProcessor(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
CentralizedServiceWorker<I, V, E, M> serviceWorker) {
this.workerClient = serviceWorker.getWorkerClient();
- this.configuration = configuration;
-
- sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
- configuration);
- sendMessageCache =
- new SendMessageCache<I, M>(configuration, serviceWorker);
- sendEdgeCache = new SendEdgeCache<I, E>(configuration, serviceWorker);
- maxMessagesSizePerWorker = configuration.getInt(
- GiraphConstants.MAX_MSG_REQUEST_SIZE,
- GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
- maxEdgesSizePerWorker = configuration.getInt(
- GiraphConstants.MAX_EDGE_REQUEST_SIZE,
- GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT);
- maxMutationsPerPartition = configuration.getInt(
- GiraphConstants.MAX_MUTATIONS_PER_REQUEST,
- GiraphConstants.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+ this.configuration = conf;
+
+ sendPartitionCache = new SendPartitionCache<I, V, E, M>(context, conf);
+ sendMessageCache = new SendMessageCache<I, M>(conf, serviceWorker);
+ sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
+ maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
+ maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
+ maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf);
this.serviceWorker = serviceWorker;
this.serverData = serviceWorker.getServerData();
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 1fb0580..ed0861e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -31,7 +31,6 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
import org.apache.giraph.comm.messages.SequentialFileMessageStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
@@ -51,6 +50,9 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map.Entry;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+
/**
* Netty worker server that implement {@link WorkerServer} and contains
* the actual {@link ServerData}.
@@ -107,9 +109,7 @@ public class NettyWorkerServer<I extends WritableComparable,
*/
private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
createMessageStoreFactory() {
- boolean useOutOfCoreMessaging = conf.getBoolean(
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+ boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
if (!useOutOfCoreMessaging) {
if (conf.useCombiner()) {
if (LOG.isInfoEnabled()) {
@@ -126,9 +126,7 @@ public class NettyWorkerServer<I extends WritableComparable,
return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
}
} else {
- int maxMessagesInMemory = conf.getInt(
- GiraphConstants.MAX_MESSAGES_IN_MEMORY,
- GiraphConstants.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+ int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
if (LOG.isInfoEnabled()) {
LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
"maxMessagesInMemory = " + maxMessagesInMemory);
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 4e739cb..83b408e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -55,12 +55,10 @@ public class RequestEncoder extends OneToOneEncoder {
* @param conf Giraph configuration
*/
public RequestEncoder(GiraphConfiguration conf) {
- bufferStartingSize = conf.getInt(
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT);
- useDirectBuffers = conf.getBoolean(
- GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS,
- GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT);
+ bufferStartingSize =
+ GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
+ useDirectBuffers =
+ GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index bbf31c7..a02039e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -19,7 +19,6 @@
package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
@@ -34,6 +33,8 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
+
/**
* Generic handler of requests.
*
@@ -71,9 +72,7 @@ public abstract class RequestServerHandler<R> extends
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo) {
this.workerRequestReservedMap = workerRequestReservedMap;
- closeFirstRequest = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 1803be4..9f3f034 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm.netty.handler;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -32,6 +31,8 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
+
/**
* Generic handler of responses.
*/
@@ -59,9 +60,7 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler {
workerIdOutstandingRequestMap,
Configuration conf) {
this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
- dropFirstResponse = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
- GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
+ dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
index d06fd09..9644a5f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
@@ -24,7 +24,6 @@ import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.SaslCompleteRequest;
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -45,6 +44,8 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collection;
+import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
+
/**
* Generate SASL response tokens to client SASL tokens, allowing clients to
* authenticate themselves with this server.
@@ -74,9 +75,7 @@ public class SaslServerHandler extends
Configuration conf) throws IOException {
SaslNettyServer.init(conf);
setupSecretManager(conf);
- closeFirstRequest = conf.getBoolean(
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
- GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
new file mode 100644
index 0000000..d00f7e9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Abstract base class of configuration options
+ */
+public abstract class AbstractConfOption
+ implements Comparable<AbstractConfOption> {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(AbstractConfOption.class);
+
+ /** Key for configuration */
+ private final String key;
+
+ /**
+ * Constructor
+ * @param key configuration key
+ */
+ public AbstractConfOption(String key) {
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ @Override public int compareTo(AbstractConfOption o) {
+ return ComparisonChain.start()
+ .compare(getType(), o.getType())
+ .compare(key, o.key)
+ .result();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AbstractConfOption)) {
+ return false;
+ }
+
+ AbstractConfOption that = (AbstractConfOption) o;
+ return Objects.equal(getType(), that.getType()) &&
+ Objects.equal(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(key);
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(" ").append(key).append(" => ").append(getDefaultValueStr());
+ sb.append(" (").append(getType().toString().toLowerCase()).append(")\n");
+ return sb.toString();
+ }
+
+ /**
+ * Get string representation of default value
+ * @return String
+ */
+ public abstract String getDefaultValueStr();
+
+ /**
+ * Get type this option holds
+ * @return ConfOptionType
+ */
+ public abstract ConfOptionType getType();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
new file mode 100644
index 0000000..cceaaef
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+
+/**
+ * Tracks all of the Giraph options
+ */
+public class AllOptions {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(AllOptions.class);
+
+ /** Configuration options */
+ private static final List<AbstractConfOption> CONF_OPTIONS =
+ Lists.newArrayList();
+
+ /** Don't construct */
+ private AllOptions() { }
+
+ /**
+ * Add an option. Subclasses of {@link AbstractConfOption} should call this
+ * at the end of their constructor.
+ * @param confOption option
+ */
+ public static void add(AbstractConfOption confOption) {
+ CONF_OPTIONS.add(confOption);
+ }
+
+ /**
+ * String representation of all of the options stored
+ * @return string
+ */
+ public static String allOptionsString() {
+ Collections.sort(CONF_OPTIONS);
+ StringBuilder sb = new StringBuilder(CONF_OPTIONS.size() * 30);
+ sb.append("All Options:\n");
+ ConfOptionType lastType = null;
+ for (AbstractConfOption confOption : CONF_OPTIONS) {
+ if (!confOption.getType().equals(lastType)) {
+ sb.append(confOption.getType().toString().toLowerCase()).append(":\n");
+ lastType = confOption.getType();
+ }
+ sb.append(confOption);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Command line utility to dump all Giraph options
+ * @param args cmdline args
+ */
+ public static void main(String[] args) {
+ // This is necessary to trigger the static constants in GiraphConstants to
+ // get loaded. Without it we get no output.
+ VERTEX_CLASS.toString();
+
+ LOG.info(allOptionsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
new file mode 100644
index 0000000..c16ec88
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Boolean configuration option
+ */
+public class BooleanConfOption extends AbstractConfOption {
+ /** Default value */
+ private final boolean defaultValue;
+
+ /**
+ * Constructor
+ * @param key configuration key
+ * @param defaultValue default value
+ */
+ public BooleanConfOption(String key, boolean defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public boolean isDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Boolean.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.BOOLEAN;
+ }
+
+ /**
+ * Lookup value in Configuration
+ * @param conf Configuration
+ * @return value for key in conf, or defaultValue if not present
+ */
+ public boolean get(Configuration conf) {
+ return conf.getBoolean(getKey(), defaultValue);
+ }
+
+ /**
+ * Check if value is true
+ * @param conf Configuration
+ * @return true if value is set and true, false otherwise
+ */
+ public boolean isFalse(Configuration conf) {
+ return !get(conf);
+ }
+
+ /**
+ * Check if value is false
+ * @param conf Configuration
+ * @return true if value is set and true, false otherwise
+ */
+ public boolean isTrue(Configuration conf) {
+ return get(conf);
+ }
+
+ /**
+ * Set value in configuration for this key
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, boolean value) {
+ conf.setBoolean(getKey(), value);
+ }
+
+ /**
+ * Set value in configuration if it hasn't been set already
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, boolean value) {
+ conf.setBooleanIfUnset(getKey(), value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
new file mode 100644
index 0000000..d67e0a5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * Class configuration option
+ * @param <C> interface of class
+ */
+public class ClassConfOption<C> extends AbstractConfOption {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(ClassConfOption.class);
+
+ /** Base interface for class */
+ private final Class<C> interfaceClass;
+ /** Default class if not set in configuration */
+ private final Class<? extends C> defaultClass;
+
+ /**
+ * Private constructor
+ * @param key Key
+ * @param defaultClass default class
+ * @param interfaceClass interface class
+ */
+ private ClassConfOption(String key, Class<? extends C> defaultClass,
+ Class<C> interfaceClass) {
+ super(key);
+ this.defaultClass = defaultClass;
+ this.interfaceClass = interfaceClass;
+ AllOptions.add(this);
+ }
+
+ /**
+ * Static create method
+ * @param key key
+ * @param defaultClass default class
+ * @param interfaceClass interface class
+ * @param <T> type of class
+ * @return ClassConfOption
+ */
+ public static <T> ClassConfOption<T> create(String key,
+ Class<? extends T> defaultClass, Class<T> interfaceClass) {
+ return new ClassConfOption<T>(key, defaultClass, interfaceClass);
+ }
+
+ public Class<? extends C> getDefaultClass() {
+ return defaultClass;
+ }
+
+ public Class<C> getInterfaceClass() {
+ return interfaceClass;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return defaultClass == null ? "null" : defaultClass.getSimpleName();
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.CLASS;
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(" ");
+ sb.append(getKey()).append(" => ").append(getDefaultValueStr());
+ sb.append(" [").append(interfaceClass.getSimpleName()).append("] ");
+ sb.append(" (").append(getType().toString().toLowerCase()).append(")\n");
+ return sb.toString();
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return Class set for key, or defaultClass
+ */
+ public Class<? extends C> get(Configuration conf) {
+ return conf.getClass(getKey(), defaultClass, interfaceClass);
+ }
+
+ /**
+ * Lookup array of classes for key
+ * @param conf Configuration
+ * @return array of classes
+ */
+ public Class<? extends C>[] getArray(Configuration conf) {
+ return getClassesOfType(conf, getKey(), interfaceClass);
+ }
+
+ /**
+ * Get classes from a property that all implement a given interface.
+ *
+ * @param conf Configuration
+ * @param name String name of property to fetch.
+ * @param xface interface classes must implement.
+ * @param defaultValue If not found, return this
+ * @param <T> Generic type of interface class
+ * @return array of Classes implementing interface specified.
+ */
+ public static <T> Class<? extends T>[] getClassesOfType(Configuration conf,
+ String name, Class<T> xface, Class<? extends T> ... defaultValue) {
+ Class<?>[] klasses = conf.getClasses(name, defaultValue);
+ for (Class<?> klass : klasses) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " is not assignable from " +
+ xface.getName());
+ }
+ }
+ return (Class<? extends T>[]) klasses;
+ }
+
+ /**
+ * Lookup with user specified default value
+ * @param conf Configuration
+ * @param defaultValue default value
+ * @return Class
+ */
+ public Class<? extends C> getWithDefault(Configuration conf,
+ Class<? extends C> defaultValue) {
+ return conf.getClass(getKey(), defaultValue, interfaceClass);
+ }
+
+ /**
+ * Set value for key
+ * @param conf Configuration
+ * @param klass Class to set
+ */
+ public void set(Configuration conf, Class<? extends C> klass) {
+ conf.setClass(getKey(), klass, interfaceClass);
+ }
+
+ /**
+ * Add class to list for key
+ * @param conf Configuration
+ * @param klass Class to add
+ */
+ public void add(Configuration conf, Class<? extends C> klass) {
+ addToClasses(conf, getKey(), klass, interfaceClass);
+ }
+
+ /**
+ * Add a class to a property that is a list of classes. If the property does
+ * not exist it will be created.
+ *
+ * @param <T> type of class
+ * @param conf Configuration
+ * @param name String name of property.
+ * @param klass interface of the class being set.
+ * @param xface Class to add to the list.
+ */
+ public static <T> void addToClasses(Configuration conf, String name,
+ Class<? extends T> klass, Class<T> xface) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " does not implement " +
+ xface.getName());
+ }
+ String value;
+ String klasses = conf.get(name);
+ if (klasses == null) {
+ value = klass.getName();
+ } else {
+ value = klasses + "," + klass.getName();
+ }
+ conf.set(name, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
new file mode 100644
index 0000000..8f70d90
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+/**
+ * Type of value for a Configuration option
+ */
+public enum ConfOptionType {
+ /** boolean */
+ BOOLEAN,
+ /** class */
+ CLASS,
+ /** integer */
+ INTEGER,
+ /** float */
+ FLOAT,
+ /** long */
+ LONG,
+ /** string */
+ STRING;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
new file mode 100644
index 0000000..fa21a28
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Float Configuration option
+ */
+public class FloatConfOption extends AbstractConfOption {
+ /** Default value */
+ private final float defaultValue;
+
+ /**
+ * Constructor
+ * @param key Configuration key
+ * @param defaultValue default value
+ */
+ public FloatConfOption(String key, float defaultValue) {
+ super(key);
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ public float getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override public String getDefaultValueStr() {
+ return Float.toString(defaultValue);
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.FLOAT;
+ }
+
+ /**
+ * Lookup value
+ * @param conf Configuration
+ * @return value for key, or defaultValue if not present
+ */
+ public float get(Configuration conf) {
+ return conf.getFloat(getKey(), defaultValue);
+ }
+
+ /**
+ * Set value
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, float value) {
+ conf.setFloat(getKey(), value);
+ }
+
+ /**
+ * Set value if it's not already present
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void setIfUnset(Configuration conf, float value) {
+ if (conf.get(getKey()) == null) {
+ conf.setFloat(getKey(), value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index c13f3a2..23dab79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -149,56 +149,42 @@ public class GiraphClasses<I extends WritableComparable,
*/
public void readFromConf(Configuration conf) {
// set pre-validated generic parameter types into Configuration
- vertexClass = (Class<? extends Vertex<I, V, E, M>>)
- conf.getClass(VERTEX_CLASS, null, Vertex.class);
+ vertexClass = (Class<? extends Vertex<I, V, E, M>>) VERTEX_CLASS.get(conf);
List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
vertexClass);
vertexIdClass = (Class<I>) classList.get(0);
vertexValueClass = (Class<V>) classList.get(1);
edgeValueClass = (Class<E>) classList.get(2);
messageValueClass = (Class<M>) classList.get(3);
+
vertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
- conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class,
- VertexEdges.class);
+ VERTEX_EDGES_CLASS.get(conf);
inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
- conf.getClass(INPUT_VERTEX_EDGES_CLASS, vertexEdgesClass,
- VertexEdges.class);
+ INPUT_VERTEX_EDGES_CLASS.getWithDefault(conf, vertexEdgesClass);
vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>)
- conf.getClass(VERTEX_VALUE_FACTORY_CLASS,
- DefaultVertexValueFactory.class, VertexValueFactory.class);
+ VERTEX_VALUE_FACTORY_CLASS.get(conf);
graphPartitionerFactoryClass =
(Class<? extends GraphPartitionerFactory<I, V, E, M>>)
- conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS,
- HashPartitionerFactory.class,
- GraphPartitionerFactory.class);
+ GRAPH_PARTITIONER_FACTORY_CLASS.get(conf);
vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E, M>>)
- conf.getClass(VERTEX_INPUT_FORMAT_CLASS,
- null, VertexInputFormat.class);
+ VERTEX_INPUT_FORMAT_CLASS.get(conf);
vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
- conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
- null, VertexOutputFormat.class);
+ VERTEX_OUTPUT_FORMAT_CLASS.get(conf);
edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
- conf.getClass(EDGE_INPUT_FORMAT_CLASS,
- null, EdgeInputFormat.class);
+ EDGE_INPUT_FORMAT_CLASS.get(conf);
- aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
- TextAggregatorWriter.class, AggregatorWriter.class);
+ aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
combinerClass = (Class<? extends Combiner<I, M>>)
- conf.getClass(VERTEX_COMBINER_CLASS, null, Combiner.class);
+ VERTEX_COMBINER_CLASS.get(conf);
vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
- conf.getClass(VERTEX_RESOLVER_CLASS,
- DefaultVertexResolver.class, VertexResolver.class);
- partitionContextClass = conf.getClass(PARTITION_CONTEXT_CLASS,
- DefaultPartitionContext.class, PartitionContext.class);
- workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
- DefaultWorkerContext.class, WorkerContext.class);
- masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS,
- DefaultMasterCompute.class, MasterCompute.class);
-
+ VERTEX_RESOLVER_CLASS.get(conf);
+ partitionContextClass = PARTITION_CONTEXT_CLASS.get(conf);
+ workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
+ masterComputeClass = MASTER_COMPUTE_CLASS.get(conf);
partitionClass = (Class<? extends Partition<I, V, E, M>>)
- conf.getClass(PARTITION_CLASS, SimplePartition.class);
+ PARTITION_CLASS.get(conf);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/01c527e2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ffcae6e..dee8e98 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -21,19 +21,18 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +71,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexClass(
Class<? extends Vertex> vertexClass) {
- setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+ VERTEX_CLASS.set(this, vertexClass);
}
/**
@@ -82,8 +81,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexValueFactoryClass(
Class<? extends VertexValueFactory> vertexValueFactoryClass) {
- setClass(VERTEX_VALUE_FACTORY_CLASS, vertexValueFactoryClass,
- VertexValueFactory.class);
+ VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
}
/**
@@ -93,7 +91,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexEdgesClass(
Class<? extends VertexEdges> vertexEdgesClass) {
- setClass(VERTEX_EDGES_CLASS, vertexEdgesClass, VertexEdges.class);
+ VERTEX_EDGES_CLASS.set(this, vertexEdgesClass);
}
/**
@@ -104,8 +102,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setInputVertexEdgesClass(
Class<? extends VertexEdges> inputVertexEdgesClass) {
- setClass(INPUT_VERTEX_EDGES_CLASS, inputVertexEdgesClass,
- VertexEdges.class);
+ INPUT_VERTEX_EDGES_CLASS.set(this, inputVertexEdgesClass);
}
/**
@@ -115,9 +112,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexInputFormatClass(
Class<? extends VertexInputFormat> vertexInputFormatClass) {
- setClass(VERTEX_INPUT_FORMAT_CLASS,
- vertexInputFormatClass,
- VertexInputFormat.class);
+ VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
}
/**
@@ -127,9 +122,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setEdgeInputFormatClass(
Class<? extends EdgeInputFormat> edgeInputFormatClass) {
- setClass(EDGE_INPUT_FORMAT_CLASS,
- edgeInputFormatClass,
- EdgeInputFormat.class);
+ EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
}
/**
@@ -139,8 +132,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setMasterComputeClass(
Class<? extends MasterCompute> masterComputeClass) {
- setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
- MasterCompute.class);
+ MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
}
/**
@@ -150,8 +142,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void addMasterObserverClass(
Class<? extends MasterObserver> masterObserverClass) {
- addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass,
- MasterObserver.class);
+ MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
}
/**
@@ -161,8 +152,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void addWorkerObserverClass(
Class<? extends WorkerObserver> workerObserverClass) {
- addToClasses(WORKER_OBSERVER_CLASSES, workerObserverClass,
- WorkerObserver.class);
+ WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
}
/**
@@ -171,8 +161,7 @@ public class GiraphConfiguration extends Configuration
* @return GiraphJobObserver class set.
*/
public Class<? extends GiraphJobObserver> getJobObserverClass() {
- return getClass(JOB_OBSERVER_CLASS, DefaultJobObserver.class,
- GiraphJobObserver.class);
+ return JOB_OBSERVER_CLASS.get(this);
}
/**
@@ -181,7 +170,7 @@ public class GiraphConfiguration extends Configuration
* @param klass GiraphJobObserver class to set.
*/
public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
- setClass(JOB_OBSERVER_CLASS, klass, GiraphJobObserver.class);
+ JOB_OBSERVER_CLASS.set(this, klass);
}
/**
@@ -190,30 +179,7 @@ public class GiraphConfiguration extends Configuration
* @return true if jmap dumper is enabled.
*/
public boolean isJMapHistogramDumpEnabled() {
- return getBoolean(JMAP_ENABLE, JMAP_ENABLE_DEFAULT);
- }
-
- /**
- * Add a class to a property that is a list of classes. If the property does
- * not exist it will be created.
- *
- * @param name String name of property.
- * @param klass interface of the class being set.
- * @param xface Class to add to the list.
- */
- public final void addToClasses(String name, Class<?> klass, Class<?> xface) {
- if (!xface.isAssignableFrom(klass)) {
- throw new RuntimeException(klass + " does not implement " +
- xface.getName());
- }
- String value;
- String klasses = get(name);
- if (klasses == null) {
- value = klass.getName();
- } else {
- value = klasses + "," + klass.getName();
- }
- set(name, value);
+ return JMAP_ENABLE.get(this);
}
/**
@@ -238,36 +204,13 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Get classes from a property that all implement a given interface.
- *
- * @param name String name of property to fetch.
- * @param xface interface classes must implement.
- * @param defaultValue If not found, return this
- * @param <T> Generic type of interface class
- * @return array of Classes implementing interface specified.
- */
- public final <T> Class<? extends T>[] getClassesOfType(String name,
- Class<T> xface, Class<? extends T> ... defaultValue) {
- Class<?>[] klasses = getClasses(name, defaultValue);
- for (Class<?> klass : klasses) {
- if (!xface.isAssignableFrom(klass)) {
- throw new RuntimeException(klass + " is not assignable from " +
- xface.getName());
- }
- }
- return (Class<? extends T>[]) klasses;
- }
-
- /**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass Determines how graph is output
*/
public final void setVertexOutputFormatClass(
Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
- setClass(VERTEX_OUTPUT_FORMAT_CLASS,
- vertexOutputFormatClass,
- VertexOutputFormat.class);
+ VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
}
/**
@@ -277,7 +220,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexCombinerClass(
Class<? extends Combiner> vertexCombinerClass) {
- setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, Combiner.class);
+ VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
}
/**
@@ -286,10 +229,8 @@ public class GiraphConfiguration extends Configuration
* @param graphPartitionerFactoryClass Determines how the graph is partitioned
*/
public final void setGraphPartitionerFactoryClass(
- Class<?> graphPartitionerFactoryClass) {
- setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
- graphPartitionerFactoryClass,
- GraphPartitionerFactory.class);
+ Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
+ GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
}
/**
@@ -299,7 +240,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setVertexResolverClass(
Class<? extends VertexResolver> vertexResolverClass) {
- setClass(VERTEX_RESOLVER_CLASS, vertexResolverClass, VertexResolver.class);
+ VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
}
/**
@@ -309,7 +250,7 @@ public class GiraphConfiguration extends Configuration
* @return true if we should create non existent vertices that get messages.
*/
public final boolean getResolverCreateVertexOnMessages() {
- return getBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, true);
+ return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
}
/**
@@ -318,7 +259,7 @@ public class GiraphConfiguration extends Configuration
* @param v true if we should create vertices when they get messages.
*/
public final void setResolverCreateVertexOnMessages(boolean v) {
- setBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, v);
+ RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
}
/**
@@ -329,8 +270,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setPartitionContextClass(
Class<? extends PartitionContext> partitionContextClass) {
- setClass(PARTITION_CONTEXT_CLASS, partitionContextClass,
- PartitionContext.class);
+ PARTITION_CONTEXT_CLASS.set(this, partitionContextClass);
}
/**
@@ -341,7 +281,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setWorkerContextClass(
Class<? extends WorkerContext> workerContextClass) {
- setClass(WORKER_CONTEXT_CLASS, workerContextClass, WorkerContext.class);
+ WORKER_CONTEXT_CLASS.set(this, workerContextClass);
}
/**
@@ -352,9 +292,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setAggregatorWriterClass(
Class<? extends AggregatorWriter> aggregatorWriterClass) {
- setClass(AGGREGATOR_WRITER_CLASS,
- aggregatorWriterClass,
- AggregatorWriter.class);
+ AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
}
/**
@@ -364,9 +302,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setPartitionClass(
Class<? extends Partition> partitionClass) {
- setClass(PARTITION_CLASS,
- partitionClass,
- Partition.class);
+ PARTITION_CLASS.set(this, partitionClass);
}
/**
@@ -384,7 +320,7 @@ public class GiraphConfiguration extends Configuration
float minPercentResponded) {
setInt(MIN_WORKERS, minWorkers);
setInt(MAX_WORKERS, maxWorkers);
- setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+ MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
}
public final int getMinWorkers() {
@@ -396,7 +332,7 @@ public class GiraphConfiguration extends Configuration
}
public final float getMinPercentResponded() {
- return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT);
+ return MIN_PERCENT_RESPONDED.get(this);
}
/**
@@ -411,7 +347,7 @@ public class GiraphConfiguration extends Configuration
}
public final boolean getSplitMasterWorker() {
- return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT);
+ return SPLIT_MASTER_WORKER.get(this);
}
/**
@@ -420,7 +356,7 @@ public class GiraphConfiguration extends Configuration
* @return array of MasterObserver classes.
*/
public Class<? extends MasterObserver>[] getMasterObserverClasses() {
- return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class);
+ return MASTER_OBSERVER_CLASSES.getArray(this);
}
/**
@@ -429,7 +365,7 @@ public class GiraphConfiguration extends Configuration
* @return array of WorkerObserver classes.
*/
public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
- return getClassesOfType(WORKER_OBSERVER_CLASSES, WorkerObserver.class);
+ return WORKER_OBSERVER_CLASSES.getArray(this);
}
/**
@@ -438,7 +374,7 @@ public class GiraphConfiguration extends Configuration
* @return true if metrics are enabled, false otherwise (default)
*/
public boolean metricsEnabled() {
- return getBoolean(METRICS_ENABLE, false);
+ return METRICS_ENABLE.isTrue(this);
}
/**
@@ -460,7 +396,7 @@ public class GiraphConfiguration extends Configuration
}
public String getLocalLevel() {
- return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
+ return LOG_LEVEL.get(this);
}
/**
@@ -469,15 +405,15 @@ public class GiraphConfiguration extends Configuration
* @return True if use the log thread layout option, false otherwise
*/
public boolean useLogThreadLayout() {
- return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT);
+ return LOG_THREAD_LAYOUT.get(this);
}
public boolean getLocalTestMode() {
- return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
+ return LOCAL_TEST_MODE.get(this);
}
public int getZooKeeperServerCount() {
- return getInt(ZOOKEEPER_SERVER_COUNT, ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ return ZOOKEEPER_SERVER_COUNT.get(this);
}
/**
@@ -490,31 +426,27 @@ public class GiraphConfiguration extends Configuration
}
public int getZooKeeperSessionTimeout() {
- return getInt(ZOOKEEPER_SESSION_TIMEOUT, ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ return ZOOKEEPER_SESSION_TIMEOUT.get(this);
}
public int getZookeeperOpsMaxAttempts() {
- return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS,
- ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT);
+ return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
}
public int getZookeeperOpsRetryWaitMsecs() {
- return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS,
- ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT);
+ return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
}
public boolean getNettyServerUseExecutionHandler() {
- return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
- NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
}
public int getNettyServerThreads() {
- return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT);
+ return NETTY_SERVER_THREADS.get(this);
}
public int getNettyServerExecutionThreads() {
- return getInt(NETTY_SERVER_EXECUTION_THREADS,
- NETTY_SERVER_EXECUTION_THREADS_DEFAULT);
+ return NETTY_SERVER_EXECUTION_THREADS.get(this);
}
/**
@@ -532,26 +464,23 @@ public class GiraphConfiguration extends Configuration
}
public int getZookeeperConnectionAttempts() {
- return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS,
- ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT);
+ return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
}
public int getZooKeeperMinSessionTimeout() {
- return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT,
- DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
+ return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
}
public int getZooKeeperMaxSessionTimeout() {
- return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT,
- DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT);
+ return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
}
public String getZooKeeperForceSync() {
- return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC);
+ return ZOOKEEPER_FORCE_SYNC.get(this);
}
public String getZooKeeperSkipAcl() {
- return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL);
+ return ZOOKEEPER_SKIP_ACL.get(this);
}
/**
@@ -574,7 +503,7 @@ public class GiraphConfiguration extends Configuration
* @return True if should authenticate, false otherwise
*/
public boolean authenticate() {
- return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
+ return AUTHENTICATE.get(this);
}
/**
@@ -583,11 +512,11 @@ public class GiraphConfiguration extends Configuration
* @param numComputeThreads Number of compute threads to use
*/
public void setNumComputeThreads(int numComputeThreads) {
- setInt(NUM_COMPUTE_THREADS, numComputeThreads);
+ NUM_COMPUTE_THREADS.set(this, numComputeThreads);
}
public int getNumComputeThreads() {
- return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT);
+ return NUM_COMPUTE_THREADS.get(this);
}
/**
@@ -596,19 +525,19 @@ public class GiraphConfiguration extends Configuration
* @param numInputSplitsThreads Number of input split threads to use
*/
public void setNumInputSplitsThreads(int numInputSplitsThreads) {
- setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads);
+ NUM_INPUT_SPLITS_THREADS.set(this, numInputSplitsThreads);
}
public int getNumInputSplitsThreads() {
- return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT);
+ return NUM_INPUT_SPLITS_THREADS.get(this);
}
public long getInputSplitMaxVertices() {
- return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+ return INPUT_SPLIT_MAX_VERTICES.get(this);
}
public long getInputSplitMaxEdges() {
- return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
+ return INPUT_SPLIT_MAX_EDGES.get(this);
}
/**
@@ -617,7 +546,7 @@ public class GiraphConfiguration extends Configuration
* @param useUnsafeSerialization If true, use unsafe serialization
*/
public void useUnsafeSerialization(boolean useUnsafeSerialization) {
- setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
+ USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
}
/**
@@ -627,8 +556,7 @@ public class GiraphConfiguration extends Configuration
* @return Whether to use message size encoding
*/
public boolean useMessageSizeEncoding() {
- return getBoolean(
- USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT);
+ return USE_MESSAGE_SIZE_ENCODING.get(this);
}
/**
@@ -638,7 +566,7 @@ public class GiraphConfiguration extends Configuration
* @param checkpointFrequency How often to checkpoint (0 means never)
*/
public void setCheckpointFrequency(int checkpointFrequency) {
- setInt(CHECKPOINT_FREQUENCY, checkpointFrequency);
+ CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
}
/**
@@ -648,7 +576,7 @@ public class GiraphConfiguration extends Configuration
* @return Checkpoint frequency (0 means never)
*/
public int getCheckpointFrequency() {
- return getInt(CHECKPOINT_FREQUENCY, CHECKPOINT_FREQUENCY_DEFAULT);
+ return CHECKPOINT_FREQUENCY.get(this);
}
/**
@@ -666,7 +594,7 @@ public class GiraphConfiguration extends Configuration
* @param maxTaskAttempts Max task attempts to use
*/
public void setMaxTaskAttempts(int maxTaskAttempts) {
- setInt(MAX_TASK_ATTEMPTS, maxTaskAttempts);
+ MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
}
/**
@@ -675,7 +603,7 @@ public class GiraphConfiguration extends Configuration
* @return Max task attempts or -1, if not set
*/
public int getMaxTaskAttempts() {
- return getInt(MAX_TASK_ATTEMPTS, -1);
+ return MAX_TASK_ATTEMPTS.get(this);
}
/**
@@ -684,7 +612,7 @@ public class GiraphConfiguration extends Configuration
* @return Number of milliseconds to wait for an event before continuing on
*/
public int getEventWaitMsecs() {
- return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT);
+ return EVENT_WAIT_MSECS.get(this);
}
/**
@@ -694,7 +622,7 @@ public class GiraphConfiguration extends Configuration
* continuing on
*/
public void setEventWaitMsecs(int eventWaitMsecs) {
- setInt(EVENT_WAIT_MSECS, eventWaitMsecs);
+ EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
}
/**
@@ -705,8 +633,7 @@ public class GiraphConfiguration extends Configuration
* minimum number of workers before a superstep
*/
public int getMaxMasterSuperstepWaitMsecs() {
- return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS,
- MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT);
+ return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
}
/**
@@ -718,7 +645,7 @@ public class GiraphConfiguration extends Configuration
* number of workers before a superstep
*/
public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
- setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs);
+ MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
}
/**
@@ -738,8 +665,7 @@ public class GiraphConfiguration extends Configuration
* @return True iff we want to use input split locality
*/
public boolean useInputSplitLocality() {
- return getBoolean(GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
- GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+ return USE_INPUT_SPLIT_LOCALITY.get(this);
}
/**
@@ -748,8 +674,7 @@ public class GiraphConfiguration extends Configuration
* @return True iff we can reuse incoming edge objects.
*/
public boolean reuseIncomingEdgeObjects() {
- return getBoolean(GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS,
- GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS_DEFAULT);
+ return GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS.get(this);
}
/**
@@ -760,8 +685,8 @@ public class GiraphConfiguration extends Configuration
*/
public String getLocalHostname() throws UnknownHostException {
return DNS.getDefaultHost(
- get(GiraphConstants.DNS_INTERFACE, "default"),
- get(GiraphConstants.DNS_NAMESERVER, "default"));
+ GiraphConstants.DNS_INTERFACE.get(this),
+ GiraphConstants.DNS_NAMESERVER.get(this));
}
/**
@@ -771,7 +696,7 @@ public class GiraphConfiguration extends Configuration
* @param maxNumberOfSupersteps Maximum number of supersteps
*/
public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
- setInt(MAX_NUMBER_OF_SUPERSTEPS, maxNumberOfSupersteps);
+ MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
}
/**
@@ -781,6 +706,6 @@ public class GiraphConfiguration extends Configuration
* @return Maximum number of supersteps
*/
public int getMaxNumberOfSupersteps() {
- return getInt(MAX_NUMBER_OF_SUPERSTEPS, MAX_NUMBER_OF_SUPERSTEPS_DEFAULT);
+ return MAX_NUMBER_OF_SUPERSTEPS.get(this);
}
}