You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC
svn commit: r1390014 [1/4] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/messages/ src/...
Author: aching
Date: Tue Sep 25 17:40:18 2012
New Revision: 1390014
URL: http://svn.apache.org/viewvc?rev=1390014&view=rev
Log:
GIRAPH-337: Make a specific Giraph configuration for Class caching and
specific Giraph configuration
Added:
giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
- copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java
- copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java
- copied, changed from r1388628, giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
giraph/trunk/src/test/java/zk/TestZooKeeperManager.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Sep 25 17:40:18 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-337: Make a specific Giraph configuration for Class caching
+ and specific Giraph configuration. (aching)
+
GIRAPH-334: Bugfix HCatalog Hive profile. (nitayj via aching)
GIRAPH-93: Hive input / output format. (nitayj via aching)
Copied: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?p2=giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java&p1=giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Tue Sep 25 17:40:18 2012
@@ -16,28 +16,25 @@
* limitations under the License.
*/
-package org.apache.giraph.graph;
+package org.apache.giraph;
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
/**
- * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
- * Uses composition to avoid unwanted {@link Job} methods from exposure
- * to the user.
+ * Adds user methods specific to Giraph. This will be put into an
+ * ImmutableClassesGiraphConfiguration that provides the configuration plus
+ * the immutable classes.
*/
-public class GiraphJob {
- static {
- Configuration.addDefaultResource("giraph-site.xml");
- }
-
+public class GiraphConfiguration extends Configuration {
/** Vertex class - required */
public static final String VERTEX_CLASS = "giraph.vertexClass";
/** VertexInputFormat class - required */
@@ -172,6 +169,71 @@ public class GiraphJob {
*/
public static final int TCP_BACKLOG_DEFAULT = 1;
+ /** How big to make the default buffer? */
+ public static final String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+ "giraph.nettyRequestEncoderBufferSize";
+ /** Start with 32K */
+ public static final int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT =
+ 32 * 1024;
+
+ /** Netty client threads */
+ public static final String NETTY_CLIENT_THREADS =
+ "giraph.nettyClientThreads";
+ /** Default is 4 */
+ public static final int NETTY_CLIENT_THREADS_DEFAULT = 4;
+
+ /** Netty server threads */
+ public static final String NETTY_SERVER_THREADS =
+ "giraph.nettyServerThreads";
+ /** Default is 16 */
+ public static final int NETTY_SERVER_THREADS_DEFAULT = 16;
+
+ /** Use the execution handler in netty on the client? */
+ public static final String NETTY_CLIENT_USE_EXECUTION_HANDLER =
+ "giraph.nettyClientUseExecutionHandler";
+ /** Use the execution handler in netty on the client - default true */
+ public static final boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT =
+ true;
+
+ /** Netty client execution threads (execution handler) */
+ public static final String NETTY_CLIENT_EXECUTION_THREADS =
+ "giraph.nettyClientExecutionThreads";
+ /** Default Netty client execution threads (execution handler) of 8 */
+ public static final int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+
+ /** Where to place the netty client execution handle? */
+ public static final String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+ "giraph.nettyClientExecutionAfterHandler";
+ /**
+ * Default is to use the netty client execution handle after the request
+ * encoder.
+ */
+ public static final String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT =
+ "requestEncoder";
+
+ /** Use the execution handler in netty on the server? */
+ public static final String NETTY_SERVER_USE_EXECUTION_HANDLER =
+ "giraph.nettyServerUseExecutionHandler";
+ /** Use the execution handler in netty on the server - default true */
+ public static final boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT =
+ true;
+
+ /** Netty server execution threads (execution handler) */
+ public static final String NETTY_SERVER_EXECUTION_THREADS =
+ "giraph.nettyServerExecutionThreads";
+ /** Default Netty server execution threads (execution handler) of 8 */
+ public static final int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+
+ /** Where to place the netty server execution handle? */
+ public static final String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+ "giraph.nettyServerExecutionAfterHandler";
+ /**
+ * Default is to use the netty server execution handle after the request
+ * frame decoder.
+ */
+ public static final String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT =
+ "requestFrameDecoder";
+
/** Netty simulate a first request closed */
public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
"giraph.nettySimulateFirstRequestClosed";
@@ -483,92 +545,18 @@ public class GiraphJob {
/** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-
- /** Internal job that actually is submitted */
- private final Job job;
- /** Helper configuration from the job */
- private final Configuration conf;
-
-
/**
- * Constructor that will instantiate the configuration
- *
- * @param jobName User-defined job name
- * @throws IOException
+ * Constructor that creates the configuration
*/
- public GiraphJob(String jobName) throws IOException {
- this(new Configuration(), jobName);
- }
+ public GiraphConfiguration() { }
/**
* Constructor.
*
- * @param conf User-defined configuration
- * @param jobName User-defined job name
- * @throws IOException
- */
- public GiraphJob(Configuration conf, String jobName) throws IOException {
- job = new Job(conf, jobName);
- this.conf = job.getConfiguration();
- }
-
- /**
- * Get the configuration from the internal job.
- *
- * @return Configuration used by the job.
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Be very cautious when using this method as it returns the internal job
- * of {@link GiraphJob}. This should only be used for methods that require
- * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
- *
- * @return Internal job that will actually be submitted to Hadoop.
- */
- public Job getInternalJob() {
- return job;
- }
- /**
- * Make sure the configuration is set properly by the user prior to
- * submitting the job.
+ * @param conf Configuration
*/
- private void checkConfiguration() {
- if (conf.getInt(MAX_WORKERS, -1) < 0) {
- throw new RuntimeException("No valid " + MAX_WORKERS);
- }
- if (conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
- conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
- throw new IllegalArgumentException(
- "Invalid " +
- conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
- MIN_PERCENT_RESPONDED);
- }
- if (conf.getInt(MIN_WORKERS, -1) < 0) {
- throw new IllegalArgumentException("No valid " + MIN_WORKERS);
- }
- if (BspUtils.getVertexClass(getConfiguration()) == null) {
- throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
- }
- if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
- throw new IllegalArgumentException(
- "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
- }
- if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
- setVertexResolverClass(VertexResolver.class);
- if (LOG.isInfoEnabled()) {
- LOG.info("GiraphJob: No class found for " +
- VERTEX_RESOLVER_CLASS + ", defaulting to " +
- VertexResolver.class.getCanonicalName());
- }
- }
+ public GiraphConfiguration(Configuration conf) {
+ super(conf);
}
/**
@@ -576,8 +564,9 @@ public class GiraphJob {
*
* @param vertexClass Runs vertex computation
*/
- public final void setVertexClass(Class<?> vertexClass) {
- getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+ public final void setVertexClass(
+ Class<? extends Vertex> vertexClass) {
+ setClass(VERTEX_CLASS, vertexClass, Vertex.class);
}
/**
@@ -586,8 +575,8 @@ public class GiraphJob {
* @param vertexInputFormatClass Determines how graph is input
*/
public final void setVertexInputFormatClass(
- Class<?> vertexInputFormatClass) {
- getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
+ Class<? extends VertexInputFormat> vertexInputFormatClass) {
+ setClass(VERTEX_INPUT_FORMAT_CLASS,
vertexInputFormatClass,
VertexInputFormat.class);
}
@@ -597,8 +586,9 @@ public class GiraphJob {
*
* @param masterComputeClass Runs master computation
*/
- public final void setMasterComputeClass(Class<?> masterComputeClass) {
- getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
+ public final void setMasterComputeClass(
+ Class<? extends MasterCompute> masterComputeClass) {
+ setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
MasterCompute.class);
}
@@ -608,8 +598,8 @@ public class GiraphJob {
* @param vertexOutputFormatClass Determines how graph is output
*/
public final void setVertexOutputFormatClass(
- Class<?> vertexOutputFormatClass) {
- getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
+ Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+ setClass(VERTEX_OUTPUT_FORMAT_CLASS,
vertexOutputFormatClass,
VertexOutputFormat.class);
}
@@ -619,8 +609,9 @@ public class GiraphJob {
*
* @param vertexCombinerClass Determines how vertex messages are combined
*/
- public final void setVertexCombinerClass(Class<?> vertexCombinerClass) {
- getConfiguration().setClass(VERTEX_COMBINER_CLASS,
+ public final void setVertexCombinerClass(
+ Class<? extends VertexCombiner> vertexCombinerClass) {
+ setClass(VERTEX_COMBINER_CLASS,
vertexCombinerClass,
VertexCombiner.class);
}
@@ -632,7 +623,7 @@ public class GiraphJob {
*/
public final void setGraphPartitionerFactoryClass(
Class<?> graphPartitionerFactoryClass) {
- getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+ setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
graphPartitionerFactoryClass,
GraphPartitionerFactory.class);
}
@@ -642,8 +633,9 @@ public class GiraphJob {
*
* @param vertexResolverClass Determines how vertex mutations are resolved
*/
- public final void setVertexResolverClass(Class<?> vertexResolverClass) {
- getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
+ public final void setVertexResolverClass(
+ Class<? extends VertexResolver> vertexResolverClass) {
+ setClass(VERTEX_RESOLVER_CLASS,
vertexResolverClass,
VertexResolver.class);
}
@@ -654,8 +646,9 @@ public class GiraphJob {
* @param workerContextClass Determines what code is executed on a each
* worker before and after each superstep and computation
*/
- public final void setWorkerContextClass(Class<?> workerContextClass) {
- getConfiguration().setClass(WORKER_CONTEXT_CLASS,
+ public final void setWorkerContextClass(
+ Class<? extends WorkerContext> workerContextClass) {
+ setClass(WORKER_CONTEXT_CLASS,
workerContextClass,
WorkerContext.class);
}
@@ -667,8 +660,8 @@ public class GiraphJob {
* written to file at the end of the job
*/
public final void setAggregatorWriterClass(
- Class<?> aggregatorWriterClass) {
- getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
+ Class<? extends AggregatorWriter> aggregatorWriterClass) {
+ setClass(AGGREGATOR_WRITER_CLASS,
aggregatorWriterClass,
AggregatorWriter.class);
}
@@ -684,11 +677,23 @@ public class GiraphJob {
* have responded before continuing the superstep
*/
public final void setWorkerConfiguration(int minWorkers,
- int maxWorkers,
- float minPercentResponded) {
- conf.setInt(MIN_WORKERS, minWorkers);
- conf.setInt(MAX_WORKERS, maxWorkers);
- conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+ int maxWorkers,
+ float minPercentResponded) {
+ setInt(MIN_WORKERS, minWorkers);
+ setInt(MAX_WORKERS, maxWorkers);
+ setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+ }
+
+ public final int getMinWorkers() {
+ return getInt(MIN_WORKERS, -1);
+ }
+
+ public final int getMaxWorkers() {
+ return getInt(MAX_WORKERS, -1);
+ }
+
+ public final float getMinPercentResponded() {
+ return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT);
}
/**
@@ -699,90 +704,87 @@ public class GiraphJob {
* (i.e. zk1:2221,zk2:2221)
*/
public final void setZooKeeperConfiguration(String serverList) {
- conf.set(ZOOKEEPER_LIST, serverList);
+ set(ZOOKEEPER_LIST, serverList);
+ }
+
+ public final boolean getSplitMasterWorker() {
+ return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT);
}
/**
- * Check if the configuration is local. If it is local, do additional
- * checks due to the restrictions of LocalJobRunner.
+ * Get the task partition
*
- * @param conf Configuration
+ * @return The task partition or -1 if not set
*/
- private static void checkLocalJobRunnerConfiguration(
- Configuration conf) {
- String jobTracker = conf.get("mapred.job.tracker", null);
- if (!jobTracker.equals("local")) {
- // Nothing to check
- return;
- }
-
- int maxWorkers = conf.getInt(MAX_WORKERS, -1);
- if (maxWorkers != 1) {
- throw new IllegalArgumentException(
- "checkLocalJobRunnerConfiguration: When using " +
- "LocalJobRunner, must have only one worker since " +
- "only 1 task at a time!");
- }
- if (conf.getBoolean(SPLIT_MASTER_WORKER,
- SPLIT_MASTER_WORKER_DEFAULT)) {
- throw new IllegalArgumentException(
- "checkLocalJobRunnerConfiguration: When using " +
- "LocalJobRunner, you cannot run in split master / worker " +
- "mode since there is only 1 task at a time!");
- }
+ public int getTaskPartition() {
+ return getInt("mapred.task.partition", -1);
}
/**
- * Check whether a specified int conf value is set and if not, set it.
+ * Get the ZooKeeper list.
*
- * @param param Conf value to check
- * @param defaultValue Assign to value if not set
+ * @return ZooKeeper list of strings, comma separated or null if none set.
*/
- private void setIntConfIfDefault(String param, int defaultValue) {
- if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
- conf.setInt(param, defaultValue);
- }
+ public String getZookeeperList() {
+ return get(ZOOKEEPER_LIST);
+ }
+
+ public String getLocalLevel() {
+ return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
+ }
+
+ public boolean getLocalTestMode() {
+ return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
+ }
+
+ public boolean getUseNetty() {
+ return getBoolean(USE_NETTY, USE_NETTY_DEFAULT);
+ }
+
+ public int getZooKeeperServerCount() {
+ return getInt(ZOOKEEPER_SERVER_COUNT,
+ ZOOKEEPER_SERVER_COUNT_DEFAULT);
}
/**
- * Runs the actual graph application through Hadoop Map-Reduce.
+ * Set the ZooKeeper jar classpath
*
- * @param verbose If true, provide verbose output, false otherwise
- * @return True if success, false otherwise
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws IOException
- */
- public final boolean run(boolean verbose)
- throws IOException, InterruptedException, ClassNotFoundException {
- checkConfiguration();
- checkLocalJobRunnerConfiguration(conf);
- job.setNumReduceTasks(0);
- // Most users won't hit this hopefully and can set it higher if desired
- setIntConfIfDefault("mapreduce.job.counters.limit", 512);
-
- // Capacity scheduler-specific settings. These should be enough for
- // a reasonable Giraph job
- setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
- setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
-
- // Speculative execution doesn't make sense for Giraph
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-
- // Set the ping interval to 5 minutes instead of one minute
- // (DEFAULT_PING_INTERVAL)
- Client.setPingInterval(conf, 60000 * 5);
+ * @param classPath Classpath for the ZooKeeper jar
+ */
+ public void setZooKeeperJar(String classPath) {
+ set(ZOOKEEPER_JAR, classPath);
+ }
+
+ public int getZooKeeperSessionTimeout() {
+ return getInt(ZOOKEEPER_SESSION_TIMEOUT,
+ ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ }
- if (job.getJar() == null) {
- job.setJarByClass(GiraphJob.class);
+ public boolean getNettyServerUseExecutionHandler() {
+ return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
+ NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ }
+
+ public int getNettyServerThreads() {
+ return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT);
+ }
+
+ public int getNettyServerExecutionThreads() {
+ return getInt(NETTY_SERVER_EXECUTION_THREADS,
+ NETTY_SERVER_EXECUTION_THREADS_DEFAULT);
+ }
+
+ /**
+ * Get the netty server execution concurrency. This depends on whether the
+ * netty server execution handler exists.
+ *
+ * @return Server concurrency
+ */
+ public int getNettyServerExecutionConcurrency() {
+ if (getNettyServerUseExecutionHandler()) {
+ return getNettyServerExecutionThreads();
+ } else {
+ return getNettyServerThreads();
}
- // Should work in MAPREDUCE-1938 to let the user jars/classes
- // get loaded first
- conf.setBoolean("mapreduce.user.classpath.first", true);
-
- job.setMapperClass(GraphMapper.class);
- job.setInputFormatClass(BspInputFormat.class);
- job.setOutputFormatClass(BspOutputFormat.class);
- return job.waitForCompletion(verbose);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java Tue Sep 25 17:40:18 2012
@@ -23,9 +23,15 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.giraph.examples.Algorithm;
+import org.apache.giraph.graph.AggregatorWriter;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.GiraphTypeValidator;
+import org.apache.giraph.graph.MasterCompute;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.utils.AnnotationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -159,14 +165,21 @@ public class GiraphRunner implements Too
}
int workers = Integer.parseInt(cmd.getOptionValue('w'));
- GiraphJob job = new GiraphJob(getConf(), "Giraph: " + vertexClassName);
- job.setVertexClass(Class.forName(vertexClassName));
- job.setVertexInputFormatClass(Class.forName(cmd.getOptionValue("if")));
- job.setVertexOutputFormatClass(Class.forName(cmd.getOptionValue("of")));
+ GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+ giraphConfiguration.setVertexClass(
+ (Class<? extends Vertex>) Class.forName(vertexClassName));
+ giraphConfiguration.setVertexInputFormatClass(
+ (Class<? extends VertexInputFormat>)
+ Class.forName(cmd.getOptionValue("if")));
+ giraphConfiguration.setVertexOutputFormatClass(
+ (Class<? extends VertexOutputFormat>)
+ Class.forName(cmd.getOptionValue("of")));
+ GiraphJob job = new GiraphJob(
+ giraphConfiguration, "Giraph: " + vertexClassName);
if (cmd.hasOption("ip")) {
FileInputFormat.addInputPath(job.getInternalJob(),
- new Path(cmd.getOptionValue("ip")));
+ new Path(cmd.getOptionValue("ip")));
} else {
if (LOG.isInfoEnabled()) {
LOG.info("No input path specified. Ensure your InputFormat does" +
@@ -185,19 +198,27 @@ public class GiraphRunner implements Too
}
if (cmd.hasOption("c")) {
- job.setVertexCombinerClass(Class.forName(cmd.getOptionValue("c")));
+ giraphConfiguration.setVertexCombinerClass(
+ (Class<? extends VertexCombiner>)
+ Class.forName(cmd.getOptionValue("c")));
}
if (cmd.hasOption("wc")) {
- job.setWorkerContextClass(Class.forName(cmd.getOptionValue("wc")));
+ giraphConfiguration.setWorkerContextClass(
+ (Class<? extends WorkerContext>)
+ Class.forName(cmd.getOptionValue("wc")));
}
if (cmd.hasOption("mc")) {
- job.setMasterComputeClass(Class.forName(cmd.getOptionValue("mc")));
+ giraphConfiguration.setMasterComputeClass(
+ (Class<? extends MasterCompute>)
+ Class.forName(cmd.getOptionValue("mc")));
}
if (cmd.hasOption("aw")) {
- job.setAggregatorWriterClass(Class.forName(cmd.getOptionValue("aw")));
+ giraphConfiguration.setAggregatorWriterClass(
+ (Class<? extends AggregatorWriter>)
+ Class.forName(cmd.getOptionValue("aw")));
}
if (cmd.hasOption("cf")) {
@@ -230,10 +251,9 @@ public class GiraphRunner implements Too
new GiraphTypeValidator(job.getConfiguration());
validator.validateClassTypes();
- job.setWorkerConfiguration(workers, workers, 100.0f);
+ giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
boolean verbose = !cmd.hasOption('q');
-
return job.run(verbose) ? 0 : -1;
}
Copied: giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java?p2=giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java&p1=giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfigurable.java Tue Sep 25 17:40:18 2012
@@ -16,35 +16,34 @@
* limitations under the License.
*/
-package org.apache.giraph.graph.partition;
+package org.apache.giraph;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
- * Defines the partitioning framework for this application.
+ * Can be instantiated with ImmutableClassesGiraphConfiguration
*
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+public interface ImmutableClassesGiraphConfigurable<
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M extends Writable> {
/**
- * Create the {@link MasterGraphPartitioner} used by the master.
- * Instantiated once by the master and reused.
+ * Set the configuration to be used by this object.
*
- * @return Instantiated master graph partitioner
+ * @param configuration Set configuration
*/
- MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+ void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M>
+ configuration);
/**
- * Create the {@link WorkerGraphPartitioner} used by the worker.
- * Instantiated once by every worker and reused.
+ * Return the configuration used by this object.
*
- * @return Instantiated worker graph partitioner
+ * @return Set configuration
*/
- WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+ ImmutableClassesGiraphConfiguration<I, V, E, M> getConf();
}
Added: giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1390014&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Tue Sep 25 17:40:18 2012
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.util.List;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.DefaultWorkerContext;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.TextAggregatorWriter;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The classes set here are immutable, the remaining configuration is mutable.
+ * Classes are immutable and final to provide the best performance for
+ * instantiation. Everything is thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> extends
+ GiraphConfiguration {
+ /** Vertex class - cached for fast access */
+ private final Class<? extends Vertex<I, V, E, M>> vertexClass;
+ /** Vertex id class - cached for fast access */
+ private final Class<I> vertexIdClass;
+ /** Vertex value class - cached for fast access */
+ private final Class<V> vertexValueClass;
+ /** Edge value class - cached for fast access */
+ private final Class<E> edgeValueClass;
+ /** Message value class - cached for fast access */
+ private final Class<M> messageValueClass;
+
+ /** Graph partitioner factory class - cached for fast access */
+ private final Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ graphPartitionerFactoryClass;
+ /** Master graph partitioner - cached for fast access */
+ private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+
+ /** Vertex input format class - cached for fast access */
+ private final Class<? extends VertexInputFormat<I, V, E, M>>
+ vertexInputFormatClass;
+ /** Vertex output format class - cached for fast access */
+ private final Class<? extends VertexOutputFormat<I, V, E>>
+ vertexOutputFormatClass;
+
+ /** Aggregator writer class - cached for fast access */
+ private final Class<? extends AggregatorWriter> aggregatorWriterClass;
+ /** Vertex combiner class - cached for fast access */
+ private final Class<? extends VertexCombiner<I, M>> vertexCombinerClass;
+ /** Vertex resolver class - cached for fast access */
+ private final Class<? extends VertexResolver<I, V, E, M>>
+ vertexResolverClass;
+ /** Worker context class - cached for fast access */
+ private final Class<? extends WorkerContext> workerContextClass;
+ /** Master compute class - cached for fast access */
+ private final Class<? extends MasterCompute> masterComputeClass;
+
+ /**
+ * Constructor. Takes the configuration and then gets the classes out of
+ * them for Giraph
+ *
+ * @param conf Configuration
+ */
+ public ImmutableClassesGiraphConfiguration(Configuration conf) {
+ super(conf);
+ // set pre-validated generic parameter types into Configuration
+ vertexClass = (Class<? extends Vertex<I, V, E, M>>)
+ conf.getClass(VERTEX_CLASS, null, Vertex.class);
+ List<Class<?>> classList =
+ org.apache.giraph.utils.ReflectionUtils.<Vertex>getTypeArguments(
+ Vertex.class, vertexClass);
+ vertexIdClass = (Class<I>) classList.get(0);
+ vertexValueClass = (Class<V>) classList.get(1);
+ edgeValueClass = (Class<E>) classList.get(2);
+ messageValueClass = (Class<M>) classList.get(3);
+
+ graphPartitionerFactoryClass =
+ (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+ conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+ HashPartitionerFactory.class,
+ GraphPartitionerFactory.class);
+ masterGraphPartitioner =
+ (MasterGraphPartitioner<I, V, E, M>)
+ createGraphPartitioner().createMasterGraphPartitioner();
+
+ vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E, M>>)
+ conf.getClass(VERTEX_INPUT_FORMAT_CLASS,
+ null, VertexInputFormat.class);
+ vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
+ conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
+ null, VertexOutputFormat.class);
+
+ aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
+ TextAggregatorWriter.class, AggregatorWriter.class);
+ vertexCombinerClass = (Class<? extends VertexCombiner<I, M>>)
+ conf.getClass(VERTEX_COMBINER_CLASS,
+ null, VertexCombiner.class);
+ vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+ conf.getClass(VERTEX_RESOLVER_CLASS,
+ VertexResolver.class, VertexResolver.class);
+ workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
+ DefaultWorkerContext.class, WorkerContext.class);
+ masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS,
+ DefaultMasterCompute.class, MasterCompute.class);
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.graph.partition.GraphPartitionerFactory}.
+ *
+ * @return User's graph partitioner
+ */
+ public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ getGraphPartitionerClass() {
+ return graphPartitionerFactoryClass;
+ }
+
+ /**
+ * Create a user graph partitioner class
+ *
+ * @return Instantiated user graph partitioner class
+ */
+ public GraphPartitionerFactory<I, V, E, M> createGraphPartitioner() {
+ return ReflectionUtils.newInstance(graphPartitionerFactoryClass, this);
+ }
+
+ /**
+ * Create a user graph partitioner partition stats class
+ *
+ * @return Instantiated user graph partition stats class
+ */
+ public PartitionStats createGraphPartitionStats() {
+ return masterGraphPartitioner.createPartitionStats();
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.graph.VertexInputFormat}.
+ *
+ * @return User's vertex input format class
+ */
+ public Class<? extends VertexInputFormat<I, V, E, M>>
+ getVertexInputFormatClass() {
+ return vertexInputFormatClass;
+ }
+
+ /**
+ * Create a user vertex input format class
+ *
+ * @return Instantiated user vertex input format class
+ */
+ public VertexInputFormat<I, V, E, M>
+ createVertexInputFormat() {
+ return ReflectionUtils.newInstance(vertexInputFormatClass, this);
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.graph.VertexOutputFormat}.
+ *
+ * @return User's vertex output format class
+ */
+ public Class<? extends VertexOutputFormat<I, V, E>>
+ getVertexOutputFormatClass() {
+ return vertexOutputFormatClass;
+ }
+
+ /**
+ * Create a user vertex output format class
+ *
+ * @return Instantiated user vertex output format class
+ */
+ @SuppressWarnings("rawtypes")
+ public VertexOutputFormat<I, V, E> createVertexOutputFormat() {
+ return ReflectionUtils.newInstance(vertexOutputFormatClass, this);
+ }
+
+ /**
+ * Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}.
+ *
+ * @return User's aggregator writer class
+ */
+ public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
+ return aggregatorWriterClass;
+ }
+
+ /**
+ * Create a user aggregator output format class
+ *
+ * @return Instantiated user aggregator writer class
+ */
+ public AggregatorWriter createAggregatorWriter() {
+ return ReflectionUtils.newInstance(aggregatorWriterClass, this);
+ }
+
+ /**
+ * Get the user's subclassed {@link org.apache.giraph.graph.VertexCombiner}.
+ *
+ * @return User's vertex combiner class
+ */
+ public Class<? extends VertexCombiner<I, M>> getVertexCombinerClass() {
+ return vertexCombinerClass;
+ }
+
+ /**
+ * Create a user vertex combiner class
+ *
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public VertexCombiner<I, M> createVertexCombiner() {
+ return ReflectionUtils.newInstance(vertexCombinerClass, this);
+ }
+
+ /**
+ * Get the user's subclassed VertexResolver.
+ *
+ * @return User's vertex resolver class
+ */
+ public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+ return vertexResolverClass;
+ }
+
+ /**
+ * Create a user vertex revolver
+ *
+ * @param graphState State of the graph from the worker
+ * @return Instantiated user vertex resolver
+ */
+ @SuppressWarnings("rawtypes")
+ public VertexResolver<I, V, E, M> createVertexResolver(
+ GraphState<I, V, E, M> graphState) {
+ VertexResolver<I, V, E, M> resolver =
+ ReflectionUtils.newInstance(vertexResolverClass, this);
+ resolver.setGraphState(graphState);
+ return resolver;
+ }
+
+ /**
+ * Get the user's subclassed WorkerContext.
+ *
+ * @return User's worker context class
+ */
+ public Class<? extends WorkerContext> getWorkerContextClass() {
+ return workerContextClass;
+ }
+
+ /**
+ * Create a user worker context
+ *
+ * @param graphState State of the graph from the worker
+ * @return Instantiated user worker context
+ */
+ @SuppressWarnings("rawtypes")
+ public WorkerContext createWorkerContext(GraphState<I, V, E, M> graphState) {
+ WorkerContext workerContext =
+ ReflectionUtils.newInstance(workerContextClass, this);
+ workerContext.setGraphState(graphState);
+ return workerContext;
+ }
+
+ /**
+ * Get the user's subclassed {@link org.apache.giraph.graph.MasterCompute}
+ *
+ * @return User's master class
+ */
+ public Class<? extends MasterCompute> getMasterComputeClass() {
+ return masterComputeClass;
+ }
+
+ /**
+ * Create a user master
+ *
+ * @return Instantiated user master
+ */
+ public MasterCompute createMasterCompute() {
+ return ReflectionUtils.newInstance(masterComputeClass, this);
+ }
+
+ /**
+ * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+ *
+ * @return User's vertex class
+ */
+ public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
+ return vertexClass;
+ }
+
+ /**
+ * Create a user vertex
+ *
+ * @return Instantiated user vertex
+ */
+ public Vertex<I, V, E, M> createVertex() {
+ return ReflectionUtils.newInstance(vertexClass, this);
+ }
+
+ /**
+ * Get the user's subclassed vertex index class.
+ *
+ * @return User's vertex index class
+ */
+ public Class<I> getVertexIdClass() {
+ return vertexIdClass;
+ }
+
+ /**
+ * Create a user vertex index
+ *
+ * @return Instantiated user vertex index
+ */
+ public I createVertexId() {
+ try {
+ return vertexIdClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createVertexId: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createVertexId: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex value class.
+ *
+ * @return User's vertex value class
+ */
+ public Class<V> getVertexValueClass() {
+ return vertexValueClass;
+ }
+
+ /**
+ * Create a user vertex value
+ *
+ * @return Instantiated user vertex value
+ */
+ @SuppressWarnings("unchecked")
+ public V createVertexValue() {
+ if (vertexValueClass == NullWritable.class) {
+ return (V) NullWritable.get();
+ } else {
+ try {
+ return vertexValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createVertexValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createVertexValue: Illegally accessed", e);
+ }
+ }
+ }
+
+ /**
+ * Get the user's subclassed edge value class.
+ *
+ * @return User's vertex edge value class
+ */
+ public Class<E> getEdgeValueClass() {
+ return edgeValueClass;
+ }
+
+ /**
+ * Create a user edge value
+ *
+ * @return Instantiated user edge value
+ */
+ public E createEdgeValue() {
+ if (edgeValueClass == NullWritable.class) {
+ return (E) NullWritable.get();
+ } else {
+ try {
+ return edgeValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createEdgeValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createEdgeValue: Illegally accessed", e);
+ }
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex message value class.
+ *
+ * @return User's vertex message value class
+ */
+ @SuppressWarnings("unchecked")
+ public Class<M> getMessageValueClass() {
+ return messageValueClass;
+ }
+
+ /**
+ * Create a user vertex message value
+ *
+ * @return Instantiated user vertex message value
+ */
+ public M createMessageValue() {
+ if (messageValueClass == NullWritable.class) {
+ return (M) NullWritable.get();
+ } else {
+ try {
+ return messageValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createMessageValue: Illegally accessed", e);
+ }
+ }
+ }
+}
Copied: giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java (from r1388628, giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java?p2=giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java&p1=giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java&r1=1388628&r2=1390014&rev=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/EdgeListVertexPageRankBenchmark.java Tue Sep 25 17:40:18 2012
@@ -16,39 +16,23 @@
* limitations under the License.
*/
-package org.apache.giraph.comm.netty;
+package org.apache.giraph.benchmark;
-import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
-import org.apache.giraph.comm.MasterServer;
-import org.apache.hadoop.conf.Configuration;
-
-import java.net.InetSocketAddress;
+import java.io.IOException;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
/**
- * Netty implementation of {@link MasterServer}
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.EdgeListVertex} implementation rather than
+ * {@link org.apache.giraph.graph.HashMapVertex}
*/
-public class NettyMasterServer implements MasterServer {
- /** Netty client that does the actual I/O */
- private final NettyServer nettyServer;
-
- /**
- * Constructor
- *
- * @param conf Hadoop configuration
- */
- public NettyMasterServer(Configuration conf) {
- nettyServer = new NettyServer(conf,
- new MasterRequestServerHandler.Factory());
- nettyServer.start();
- }
-
- @Override
- public InetSocketAddress getMyAddress() {
- return nettyServer.getMyAddress();
- }
-
+public class EdgeListVertexPageRankBenchmark extends EdgeListVertex<
+ LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
@Override
- public void close() {
- nettyServer.stop();
+ public void compute(Iterable<DoubleWritable> messages) throws
+ IOException {
+ PageRankComputation.computePageRank(this, messages);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,37 +23,25 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.examples.DoubleSumCombiner;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.io.PseudoRandomVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-import java.io.IOException;
-
/**
- * Default Pregel-style PageRank computation using a {@link EdgeListVertex}.
+ * Default Pregel-style PageRank computation.
*/
-public class PageRankBenchmark extends EdgeListVertex<
- LongWritable, DoubleWritable, DoubleWritable, DoubleWritable>
- implements Tool {
+public class PageRankBenchmark implements Tool {
/** Class logger */
private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class);
/** Configuration from Configurable */
private Configuration conf;
@Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- PageRankComputation.computePageRank(this, messages);
- }
-
- @Override
public Configuration getConf() {
return conf;
}
@@ -88,10 +76,15 @@ public class PageRankBenchmark extends E
"vertexClass",
true,
"Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+ options.addOption("N",
+ "name",
+ true,
+ "Name of the job");
options.addOption("nc",
"noCombiner",
false,
"Don't use a combiner");
+
HelpFormatter formatter = new HelpFormatter();
if (args.length == 0) {
formatter.printHelp(getClass().getName(), options, true);
@@ -122,20 +115,28 @@ public class PageRankBenchmark extends E
}
int workers = Integer.parseInt(cmd.getOptionValue('w'));
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ String name = getClass().getName();
+ if (cmd.hasOption("N")) {
+ name = name + " " + cmd.getOptionValue("N");
+ }
+
+ GiraphJob job = new GiraphJob(getConf(), name);
if (!cmd.hasOption('c') ||
(Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
- job.setVertexClass(PageRankBenchmark.class);
+ job.getConfiguration().setVertexClass(
+ EdgeListVertexPageRankBenchmark.class);
} else {
- job.setVertexClass(HashMapVertexPageRankBenchmark.class);
+ job.getConfiguration().setVertexClass(
+ HashMapVertexPageRankBenchmark.class);
}
LOG.info("Using class " +
- BspUtils.getVertexClass(job.getConfiguration()).getName());
+ job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
if (!cmd.hasOption("nc")) {
- job.setVertexCombinerClass(DoubleSumCombiner.class);
+ job.getConfiguration().setVertexCombinerClass(DoubleSumCombiner.class);
}
- job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- job.setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,6 +23,7 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.EdgeListVertex;
@@ -352,12 +353,15 @@ public class RandomMessageBenchmark impl
}
int workers = Integer.parseInt(cmd.getOptionValue('w'));
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
- job.setVertexClass(RandomMessageVertex.class);
- job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
- job.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
- job.setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setInt(GiraphConfiguration.CHECKPOINT_FREQUENCY, 0);
+ job.getConfiguration().setVertexClass(RandomMessageVertex.class);
+ job.getConfiguration().setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
+ job.getConfiguration().setWorkerContextClass(
+ RandomMessageBenchmarkWorkerContext.class);
+ job.getConfiguration().setMasterComputeClass(
+ RandomMessageBenchmarkMasterCompute.class);
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
@@ -383,7 +387,8 @@ public class RandomMessageBenchmark impl
Integer.parseInt(cmd.getOptionValue('s')));
}
if (cmd.hasOption('f')) {
- job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ job.getConfiguration().setInt(
+ GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
Integer.parseInt(cmd.getOptionValue('f')));
}
if (job.run(isVerbose)) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java Tue Sep 25 17:40:18 2012
@@ -23,8 +23,8 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.examples.MinimumDoubleCombiner;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.io.PseudoRandomVertexInputFormat;
@@ -40,19 +40,26 @@ import java.io.IOException;
/**
* Single-source shortest paths benchmark.
*/
-public class ShortestPathsBenchmark extends EdgeListVertex<LongWritable,
- DoubleWritable, DoubleWritable, DoubleWritable> implements Tool {
+public class ShortestPathsBenchmark implements Tool {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ShortestPathsBenchmark.class);
/** Configuration */
private Configuration conf;
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- ShortestPathsComputation.computeShortestPaths(this, messages);
+ /**
+ * Vertex implementation
+ */
+ public static class ShortestPathsBenchmarkVertex extends
+ EdgeListVertex<LongWritable, DoubleWritable, DoubleWritable,
+ DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ ShortestPathsComputation.computeShortestPaths(this, messages);
+ }
}
+
@Override
public Configuration getConf() {
return conf;
@@ -117,17 +124,20 @@ public class ShortestPathsBenchmark exte
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
if (!cmd.hasOption('c') ||
(Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
- job.setVertexClass(ShortestPathsBenchmark.class);
+ job.getConfiguration().setVertexClass(ShortestPathsBenchmarkVertex.class);
} else {
- job.setVertexClass(HashMapVertexShortestPathsBenchmark.class);
+ job.getConfiguration().setVertexClass(
+ HashMapVertexShortestPathsBenchmark.class);
}
LOG.info("Using class " +
- BspUtils.getVertexClass(job.getConfiguration()).getName());
- job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+ job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
+ job.getConfiguration().setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
if (!cmd.hasOption("nc")) {
- job.setVertexCombinerClass(MinimumDoubleCombiner.class);
+ job.getConfiguration().setVertexCombinerClass(
+ MinimumDoubleCombiner.class);
}
- job.setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java Tue Sep 25 17:40:18 2012
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,15 +49,15 @@ public class BspInputFormat extends Inpu
* @return Maximum number of tasks
*/
public static int getMaxTasks(Configuration conf) {
- int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
+ int maxWorkers = conf.getInt(GiraphConfiguration.MAX_WORKERS, 0);
boolean splitMasterWorker =
- conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
- GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+ conf.getBoolean(GiraphConfiguration.SPLIT_MASTER_WORKER,
+ GiraphConfiguration.SPLIT_MASTER_WORKER_DEFAULT);
int maxTasks = maxWorkers;
if (splitMasterWorker) {
int zkServers =
- conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ conf.getInt(GiraphConfiguration.ZOOKEEPER_SERVER_COUNT,
+ GiraphConfiguration.ZOOKEEPER_SERVER_COUNT_DEFAULT);
maxTasks += zkServers;
}
if (LOG.isDebugEnabled()) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,11 @@
package org.apache.giraph.comm;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.graph.VertexMutations;
@@ -30,7 +31,6 @@ import org.apache.giraph.graph.WorkerInf
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RPC;
@@ -86,7 +86,7 @@ public abstract class BasicRPCCommunicat
/** Maximum number of vertices sent in a single RPC */
private static final int MAX_VERTICES_PER_RPC = 1024;
/** Hadoop configuration */
- protected final Configuration conf;
+ protected final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Saved context for progress */
private final Mapper<?, ?, ?, ?>.Context context;
/** Indicates whether in superstep preparation */
@@ -438,22 +438,23 @@ public abstract class BasicRPCCommunicat
* Only constructor.
*
* @param context Context for getting configuration
+ * @param configuration Configuration
* @param service Service worker to get the vertex ranges
* @throws IOException
- * @throws UnknownHostException
* @throws InterruptedException
*/
public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
CentralizedServiceWorker<I, V, E, M> service)
throws IOException, InterruptedException {
this.service = service;
this.context = context;
- this.conf = context.getConfiguration();
- this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
- GiraphJob.MSG_SIZE_DEFAULT);
+ this.conf = configuration;
+ this.maxSize = conf.getInt(GiraphConfiguration.MSG_SIZE,
+ GiraphConfiguration.MSG_SIZE_DEFAULT);
this.maxMessagesPerFlushPut =
- conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
- GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
+ conf.getInt(GiraphConfiguration.MAX_MESSAGES_PER_FLUSH_PUT,
+ GiraphConfiguration.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
if (BspUtils.getVertexCombinerClass(conf) == null) {
this.combiner = null;
} else {
@@ -466,19 +467,19 @@ public abstract class BasicRPCCommunicat
- int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
- GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
+ int numHandlers = conf.getInt(GiraphConfiguration.RPC_NUM_HANDLERS,
+ GiraphConfiguration.RPC_NUM_HANDLERS_DEFAULT);
if (numTasks < numHandlers) {
numHandlers = numTasks;
}
this.jobToken = createJobToken();
this.jobId = context.getJobID().toString();
- int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
+ int numWorkers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks);
// If the number of flush threads is unset, it is set to
// the number of max workers - 1 or a minimum of 1.
int numFlushThreads =
- Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ Math.max(conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
numWorkers - 1),
1);
this.executor = Executors.newFixedThreadPool(numFlushThreads);
@@ -490,16 +491,16 @@ public abstract class BasicRPCCommunicat
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
String bindAddress = localHostname;
- int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+ int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
+ GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
taskId;
int bindAttempts = 0;
final int maxRpcPortBindAttempts =
- conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
- GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
+ GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
final boolean failFirstPortBindingAttempt =
- conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
- GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+ conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+ GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
while (bindAttempts < maxRpcPortBindAttempts) {
this.myAddress = new InetSocketAddress(bindAddress, bindPort);
if (failFirstPortBindingAttempt && bindAttempts == 0) {
@@ -1216,8 +1217,7 @@ public abstract class BasicRPCCommunicat
// Resolve all graph mutations
for (I vertexIndex : resolveVertexIndexSet) {
VertexResolver<I, V, E, M> vertexResolver =
- BspUtils.createVertexResolver(
- conf, service.getGraphMapper().getGraphState());
+ conf.createVertexResolver(service.getGraphMapper().getGraphState());
Vertex<I, V, E, M> originalVertex =
service.getVertex(vertexIndex);
Iterable<M> messages = inMessages.get(vertexIndex);
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Tue Sep 25 17:40:18 2012
@@ -39,6 +39,8 @@ import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+
/*if[HADOOP_NON_SECURE]
else[HADOOP_NON_SECURE]*/
import org.apache.giraph.hadoop.BspPolicyProvider;
@@ -78,6 +80,7 @@ public class RPCCommunications<I extends
* Constructor.
*
* @param context Context to be saved.
+ * @param configuration Configuration
* @param service Server worker.
* @param graphState Graph state from infrastructure.
* @throws IOException
@@ -85,9 +88,10 @@ public class RPCCommunications<I extends
*/
public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
CentralizedServiceWorker<I, V, E, M> service,
+ ImmutableClassesGiraphConfiguration configuration,
GraphState<I, V, E, M> graphState) throws
IOException, InterruptedException {
- super(context, service);
+ super(context, configuration, service);
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java Tue Sep 25 17:40:18 2012
@@ -24,9 +24,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexCombiner;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -56,11 +55,11 @@ public class SendMessageCache<I extends
*
* @param conf Configuration used for instantiating the combiner.
*/
- public SendMessageCache(Configuration conf) {
- if (BspUtils.getVertexCombinerClass(conf) == null) {
+ public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+ if (conf.getVertexCombinerClass() == null) {
this.combiner = null;
} else {
- this.combiner = BspUtils.createVertexCombiner(conf);
+ this.combiner = conf.createVertexCombiner();
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Tue Sep 25 17:40:18 2012
@@ -18,14 +18,14 @@
package org.apache.giraph.comm;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.partition.DiskBackedPartitionStore;
import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.giraph.graph.partition.SimplePartitionStore;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -71,15 +71,16 @@ public class ServerData<I extends Writab
* @param configuration Configuration
* @param messageStoreFactory Factory for message stores
*/
- public ServerData(Configuration configuration,
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
- messageStoreFactory) {
+ public ServerData(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+ messageStoreFactory) {
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
- if (configuration.getBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH,
- GiraphJob.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+ if (configuration.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_GRAPH,
+ GiraphConfiguration.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
partitionStore = new DiskBackedPartitionStore<I, V, E, M>(configuration);
} else {
partitionStore = new SimplePartitionStore<I, V, E, M>(configuration);
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -55,7 +54,7 @@ public class DiskBackedMessageStore<I ex
/** In memory message map */
private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
/** Hadoop configuration */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
/** Combiner for messages */
private final VertexCombiner<I, M> combiner;
/** Counter for number of messages in memory */
@@ -76,7 +75,7 @@ public class DiskBackedMessageStore<I ex
* @param fileStoreFactory Factory for creating file stores when flushing
*/
public DiskBackedMessageStore(VertexCombiner<I, M> combiner,
- Configuration config,
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
this.config = config;
@@ -211,7 +210,7 @@ public class DiskBackedMessageStore<I ex
// read destination vertices
int numVertices = in.readInt();
for (int v = 0; v < numVertices; v++) {
- I vertexId = BspUtils.<I>createVertexId(config);
+ I vertexId = (I) config.createVertexId();
vertexId.readFields(in);
destinationVertices.add(vertexId);
}
@@ -219,13 +218,13 @@ public class DiskBackedMessageStore<I ex
// read in memory map
int mapSize = in.readInt();
for (int m = 0; m < mapSize; m++) {
- I vertexId = BspUtils.<I>createVertexId(config);
+ I vertexId = config.createVertexId();
vertexId.readFields(in);
int numMessages = in.readInt();
numberOfMessagesInMemory.addAndGet(numMessages);
List<M> messages = Lists.newArrayList();
for (int i = 0; i < numMessages; i++) {
- M message = BspUtils.<M>createMessageValue(config);
+ M message = config.createMessageValue();
message.readFields(in);
messages.add(message);
}
@@ -254,7 +253,7 @@ public class DiskBackedMessageStore<I ex
*/
public static <I extends WritableComparable, M extends Writable>
MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
- Configuration config,
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
return new Factory<I, M>(config, fileStoreFactory);
}
@@ -269,7 +268,7 @@ public class DiskBackedMessageStore<I ex
M extends Writable> implements MessageStoreFactory<I, M,
FlushableMessageStore<I, M>> {
/** Hadoop configuration */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration config;
/** Combiner for messages */
private final VertexCombiner<I, M> combiner;
/** Factory for creating message stores for partitions */
@@ -281,13 +280,13 @@ public class DiskBackedMessageStore<I ex
* @param fileStoreFactory Factory for creating message stores for
* partitions
*/
- public Factory(Configuration config,
+ public Factory(ImmutableClassesGiraphConfiguration config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
this.config = config;
- if (BspUtils.getVertexCombinerClass(config) == null) {
+ if (config.getVertexCombinerClass() == null) {
combiner = null;
} else {
- combiner = BspUtils.createVertexCombiner(config);
+ combiner = config.createVertexCombiner();
}
this.fileStoreFactory = fileStoreFactory;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,8 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -61,7 +60,7 @@ public class SequentialFileMessageStore<
/** File in which we store data */
private final File file;
/** Configuration which we need for reading data */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
/** Buffer size to use when reading and writing files */
private final int bufferSize;
/** File input stream */
@@ -79,7 +78,9 @@ public class SequentialFileMessageStore<
* @param fileName File in which we want to store messages
* @throws IOException
*/
- public SequentialFileMessageStore(Configuration config, int bufferSize,
+ public SequentialFileMessageStore(
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+ int bufferSize,
String fileName) {
this.config = config;
this.bufferSize = bufferSize;
@@ -248,7 +249,7 @@ public class SequentialFileMessageStore<
if (verticesLeft == 0) {
return null;
}
- currentVertexId = BspUtils.<I>createVertexId(config);
+ currentVertexId = config.createVertexId();
currentVertexId.readFields(in);
return currentVertexId;
}
@@ -263,7 +264,7 @@ public class SequentialFileMessageStore<
int messagesSize = in.readInt();
ArrayList<M> messages = Lists.newArrayList();
for (int i = 0; i < messagesSize; i++) {
- M message = BspUtils.<M>createMessageValue(config);
+ M message = config.createMessageValue();
message.readFields(in);
messages.add(message);
}
@@ -307,7 +308,7 @@ public class SequentialFileMessageStore<
*/
public static <I extends WritableComparable, M extends Writable>
MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
- Configuration config) {
+ ImmutableClassesGiraphConfiguration config) {
return new Factory<I, M>(config);
}
@@ -321,7 +322,7 @@ public class SequentialFileMessageStore<
M extends Writable>
implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
/** Hadoop configuration */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration config;
/** Directory in which we'll keep necessary files */
private final String directory;
/** Buffer size to use when reading and writing */
@@ -329,14 +330,19 @@ public class SequentialFileMessageStore<
/** Counter for created message stores */
private final AtomicInteger storeCounter;
- /** @param config Hadoop configuration */
- public Factory(Configuration config) {
+ /**
+ * Constructor.
+ *
+ * @param config Hadoop configuration
+ */
+ public Factory(ImmutableClassesGiraphConfiguration config) {
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
- this.directory = config.get(GiraphJob.MESSAGES_DIRECTORY,
- GiraphJob.MESSAGES_DIRECTORY_DEFAULT) + jobId + File.separator;
- this.bufferSize = config.getInt(GiraphJob.MESSAGES_BUFFER_SIZE,
- GiraphJob.MESSAGES_BUFFER_SIZE_DEFAULT);
+ this.directory = config.get(GiraphConfiguration.MESSAGES_DIRECTORY,
+ GiraphConfiguration.MESSAGES_DIRECTORY_DEFAULT) + jobId +
+ File.separator;
+ this.bufferSize = config.getInt(GiraphConfiguration.MESSAGES_BUFFER_SIZE,
+ GiraphConfiguration.MESSAGES_BUFFER_SIZE_DEFAULT);
storeCounter = new AtomicInteger();
new File(directory).mkdirs();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Sep 25 17:40:18 2012
@@ -18,11 +18,10 @@
package org.apache.giraph.comm.messages;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -38,9 +37,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
+import org.apache.log4j.Logger;
/**
- * Simple in memory message store implemented with a map
+ * Simple in memory message store implemented with a two level concurrent
+ * hash map.
*
* @param <I> Vertex id
* @param <V> Vertex data
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentMa
public class SimpleMessageStore<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements MessageStoreByPartition<I, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(SimpleMessageStore.class);
/** Service worker */
private final CentralizedServiceWorker<I, V, E, M> service;
/**
@@ -58,7 +61,7 @@ public class SimpleMessageStore<I extend
*/
private final ConcurrentMap<Integer, ConcurrentMap<I, Collection<M>>> map;
/** Hadoop configuration */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
/** Combiner for messages */
private final VertexCombiner<I, M> combiner;
@@ -68,7 +71,8 @@ public class SimpleMessageStore<I extend
* @param config Hadoop configuration
*/
SimpleMessageStore(CentralizedServiceWorker<I, V, E, M> service,
- VertexCombiner<I, M> combiner, Configuration config) {
+ VertexCombiner<I, M> combiner,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
this.service = service;
map = Maps.newConcurrentMap();
this.combiner = combiner;
@@ -216,12 +220,12 @@ public class SimpleMessageStore<I extend
ConcurrentMap<I, Collection<M>> partitionMap = Maps.newConcurrentMap();
int numVertices = in.readInt();
for (int v = 0; v < numVertices; v++) {
- I vertexId = BspUtils.<I>createVertexId(config);
+ I vertexId = config.createVertexId();
vertexId.readFields(in);
int numMessages = in.readInt();
List<M> messages = Lists.newArrayList();
for (int m = 0; m < numMessages; m++) {
- M message = BspUtils.<M>createMessageValue(config);
+ M message = config.createMessageValue();
message.readFields(in);
messages.add(message);
}
@@ -255,7 +259,8 @@ public class SimpleMessageStore<I extend
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable>
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E, M> service, Configuration config) {
+ CentralizedServiceWorker<I, V, E, M> service,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
return new Factory<I, V, E, M>(service, config);
}
@@ -273,7 +278,7 @@ public class SimpleMessageStore<I extend
/** Service worker */
private final CentralizedServiceWorker<I, V, E, M> service;
/** Hadoop configuration */
- private final Configuration config;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
/** Combiner for messages */
private final VertexCombiner<I, M> combiner;
@@ -282,13 +287,13 @@ public class SimpleMessageStore<I extend
* @param config Hadoop configuration
*/
public Factory(CentralizedServiceWorker<I, V, E, M> service,
- Configuration config) {
+ ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
this.service = service;
this.config = config;
- if (BspUtils.getVertexCombinerClass(config) == null) {
+ if (config.getVertexCombinerClass() == null) {
combiner = null;
} else {
- combiner = BspUtils.createVertexCombiner(config);
+ combiner = config.createVertexCombiner();
}
}