You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [10/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/example...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Thu Feb 16 22:12:31 2012
@@ -31,424 +31,479 @@ import org.apache.hadoop.util.Reflection
* instantiate them.
*/
public class BspUtils {
- /**
- * Get the user's subclassed {@link GraphPartitionerFactory}.
- *
- * @param conf Configuration to check
- * @return User's graph partitioner
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- Class<? extends GraphPartitionerFactory<I, V, E, M>>
- getGraphPartitionerClass(Configuration conf) {
- return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
- conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
- HashPartitionerFactory.class,
- GraphPartitionerFactory.class);
- }
-
- /**
- * Create a user graph partitioner class
- *
- * @param conf Configuration to check
- * @return Instantiated user graph partitioner class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- GraphPartitionerFactory<I, V, E, M>
- createGraphPartitioner(Configuration conf) {
- Class<? extends GraphPartitionerFactory<I, V, E, M>>
- graphPartitionerFactoryClass =
- getGraphPartitionerClass(conf);
- return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
- }
-
- /**
- * Create a user graph partitioner partition stats class
- *
- * @param conf Configuration to check
- * @return Instantiated user graph partition stats class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- PartitionStats createGraphPartitionStats(Configuration conf) {
- GraphPartitionerFactory<I, V, E, M> graphPartitioner =
- createGraphPartitioner(conf);
- return graphPartitioner.createMasterGraphPartitioner().
- createPartitionStats();
- }
-
- /**
- * Get the user's subclassed {@link VertexInputFormat}.
- *
- * @param conf Configuration to check
- * @return User's vertex input format class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- Class<? extends VertexInputFormat<I, V, E, M>>
- getVertexInputFormatClass(Configuration conf) {
- return (Class<? extends VertexInputFormat<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
- null,
- VertexInputFormat.class);
- }
-
- /**
- * Create a user vertex input format class
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex input format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> VertexInputFormat<I, V, E, M>
- createVertexInputFormat(Configuration conf) {
- Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
- getVertexInputFormatClass(conf);
- VertexInputFormat<I, V, E, M> inputFormat =
- ReflectionUtils.newInstance(vertexInputFormatClass, conf);
- return inputFormat;
- }
-
- /**
- * Get the user's subclassed {@link VertexOutputFormat}.
- *
- * @param conf Configuration to check
- * @return User's vertex output format class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- Class<? extends VertexOutputFormat<I, V, E>>
- getVertexOutputFormatClass(Configuration conf) {
- return (Class<? extends VertexOutputFormat<I, V, E>>)
- conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
- null,
- VertexOutputFormat.class);
- }
-
- /**
- * Create a user vertex output format class
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex output format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable> VertexOutputFormat<I, V, E>
- createVertexOutputFormat(Configuration conf) {
- Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
- getVertexOutputFormatClass(conf);
- return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
- }
-
- /**
- * Get the user's subclassed {@link AggregatorWriter}.
- *
- * @param conf Configuration to check
- * @return User's aggregator writer class
- */
- public static Class<? extends AggregatorWriter>
- getAggregatorWriterClass(Configuration conf) {
- return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
- TextAggregatorWriter.class,
- AggregatorWriter.class);
- }
-
- /**
- * Create a user aggregator output format class
- *
- * @param conf Configuration to check
- * @return Instantiated user aggregator writer class
- */
- public static AggregatorWriter
- createAggregatorWriter(Configuration conf) {
- Class<? extends AggregatorWriter> aggregatorWriterClass =
- getAggregatorWriterClass(conf);
- return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
- }
-
- /**
- * Get the user's subclassed {@link VertexCombiner}.
- *
- * @param conf Configuration to check
- * @return User's vertex combiner class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- M extends Writable>
- Class<? extends VertexCombiner<I, M>>
- getVertexCombinerClass(Configuration conf) {
- return (Class<? extends VertexCombiner<I, M>>)
- conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
- null,
- VertexCombiner.class);
- }
-
- /**
- * Create a user vertex combiner class
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex combiner class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, M extends Writable>
- VertexCombiner<I, M> createVertexCombiner(Configuration conf) {
- Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
- getVertexCombinerClass(conf);
- return ReflectionUtils.newInstance(vertexCombinerClass, conf);
- }
-
- /**
- * Get the user's subclassed VertexResolver.
- *
- * @param conf Configuration to check
- * @return User's vertex resolver class
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- Class<? extends VertexResolver<I, V, E, M>>
- getVertexResolverClass(Configuration conf) {
- return (Class<? extends VertexResolver<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
- VertexResolver.class,
- VertexResolver.class);
- }
-
- /**
- * Create a user vertex revolver
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex resolver
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
- createVertexResolver(Configuration conf,
- GraphState<I, V, E, M> graphState) {
- Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
- getVertexResolverClass(conf);
- VertexResolver<I, V, E, M> resolver =
- ReflectionUtils.newInstance(vertexResolverClass, conf);
- resolver.setGraphState(graphState);
- return resolver;
- }
-
- /**
- * Get the user's subclassed WorkerContext.
- *
- * @param conf Configuration to check
- * @return User's worker context class
- */
- public static Class<? extends WorkerContext>
- getWorkerContextClass(Configuration conf) {
- return (Class<? extends WorkerContext>)
- conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
- DefaultWorkerContext.class,
- WorkerContext.class);
- }
-
- /**
- * Create a user worker context
- *
- * @param conf Configuration to check
- * @return Instantiated user worker context
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- WorkerContext createWorkerContext(Configuration conf,
- GraphState<I, V, E, M> graphState) {
- Class<? extends WorkerContext> workerContextClass =
- getWorkerContextClass(conf);
- WorkerContext workerContext =
- ReflectionUtils.newInstance(workerContextClass, conf);
- workerContext.setGraphState(graphState);
- return workerContext;
- }
-
-
- /**
- * Get the user's subclassed {@link BasicVertex}
- *
- * @param conf Configuration to check
- * @return User's vertex class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- Class<? extends BasicVertex<I, V, E, M>>
- getVertexClass(Configuration conf) {
- return (Class<? extends BasicVertex<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_CLASS,
- null,
- BasicVertex.class);
- }
-
- /**
- * Create a user vertex
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
- createVertex(Configuration conf) {
- Class<? extends BasicVertex<I, V, E, M>> vertexClass =
- getVertexClass(conf);
- BasicVertex<I, V, E, M> vertex =
- ReflectionUtils.newInstance(vertexClass, conf);
- return vertex;
- }
-
- /**
- * Get the user's subclassed vertex index class.
- *
- * @param conf Configuration to check
- * @return User's vertex index class
- */
- @SuppressWarnings("unchecked")
- public static <I extends Writable> Class<I>
- getVertexIndexClass(Configuration conf) {
- return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
- WritableComparable.class);
- }
-
- /**
- * Create a user vertex index
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex index
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable>
- I createVertexIndex(Configuration conf) {
- Class<I> vertexClass = getVertexIndexClass(conf);
- try {
- return vertexClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createVertexIndex: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createVertexIndex: Illegally accessed", e);
- }
- }
-
- /**
- * Get the user's subclassed vertex value class.
- *
- * @param conf Configuration to check
- * @return User's vertex value class
- */
- @SuppressWarnings("unchecked")
- public static <V extends Writable> Class<V>
- getVertexValueClass(Configuration conf) {
- return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
- Writable.class);
- }
-
- /**
- * Create a user vertex value
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex value
- */
- public static <V extends Writable> V
- createVertexValue(Configuration conf) {
- Class<V> vertexValueClass = getVertexValueClass(conf);
- try {
- return vertexValueClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createVertexValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createVertexValue: Illegally accessed", e);
- }
- }
-
- /**
- * Get the user's subclassed edge value class.
- *
- * @param conf Configuration to check
- * @return User's vertex edge value class
- */
- @SuppressWarnings("unchecked")
- public static <E extends Writable> Class<E>
- getEdgeValueClass(Configuration conf){
- return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
- Writable.class);
- }
-
- /**
- * Create a user edge value
- *
- * @param conf Configuration to check
- * @return Instantiated user edge value
- */
- public static <E extends Writable> E
- createEdgeValue(Configuration conf) {
- Class<E> edgeValueClass = getEdgeValueClass(conf);
- try {
- return edgeValueClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createEdgeValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createEdgeValue: Illegally accessed", e);
- }
- }
-
- /**
- * Get the user's subclassed vertex message value class.
- *
- * @param conf Configuration to check
- * @return User's vertex message value class
- */
- @SuppressWarnings("unchecked")
- public static <M extends Writable> Class<M>
- getMessageValueClass(Configuration conf) {
- return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
- Writable.class);
- }
-
- /**
- * Create a user vertex message value
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex message value
- */
- public static <M extends Writable> M
- createMessageValue(Configuration conf) {
- Class<M> messageValueClass = getMessageValueClass(conf);
- try {
- return messageValueClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createMessageValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createMessageValue: Illegally accessed", e);
- }
+ /**
+ * Do not construct.
+ */
+ private BspUtils() { }
+
+ /**
+ * Get the user's subclassed {@link GraphPartitionerFactory}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's graph partitioner
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ getGraphPartitionerClass(Configuration conf) {
+ return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+ conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+ HashPartitionerFactory.class,
+ GraphPartitionerFactory.class);
+ }
+
+ /**
+ * Create a user graph partitioner class
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user graph partitioner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ GraphPartitionerFactory<I, V, E, M>
+ createGraphPartitioner(Configuration conf) {
+ Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ graphPartitionerFactoryClass = getGraphPartitionerClass(conf);
+ return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
+ }
+
+ /**
+ * Create a user graph partitioner partition stats class
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user graph partition stats class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ PartitionStats createGraphPartitionStats(Configuration conf) {
+ GraphPartitionerFactory<I, V, E, M> graphPartitioner =
+ createGraphPartitioner(conf);
+ return graphPartitioner.createMasterGraphPartitioner().
+ createPartitionStats();
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's vertex input format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ Class<? extends VertexInputFormat<I, V, E, M>>
+ getVertexInputFormatClass(Configuration conf) {
+ return (Class<? extends VertexInputFormat<I, V, E, M>>)
+ conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
+ null,
+ VertexInputFormat.class);
+ }
+
+ /**
+ * Create a user vertex input format class
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex input format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable> VertexInputFormat<I, V, E, M>
+ createVertexInputFormat(Configuration conf) {
+ Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+ getVertexInputFormatClass(conf);
+ VertexInputFormat<I, V, E, M> inputFormat =
+ ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+ return inputFormat;
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexOutputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return User's vertex output format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ Class<? extends VertexOutputFormat<I, V, E>>
+ getVertexOutputFormatClass(Configuration conf) {
+ return (Class<? extends VertexOutputFormat<I, V, E>>)
+ conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
+ null,
+ VertexOutputFormat.class);
+ }
+
+ /**
+ * Create a user vertex output format class
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex output format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable> VertexOutputFormat<I, V, E>
+ createVertexOutputFormat(Configuration conf) {
+ Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
+ getVertexOutputFormatClass(conf);
+ return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed {@link AggregatorWriter}.
+ *
+ * @param conf Configuration to check
+ * @return User's aggregator writer class
+ */
+ public static Class<? extends AggregatorWriter>
+ getAggregatorWriterClass(Configuration conf) {
+ return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+ TextAggregatorWriter.class,
+ AggregatorWriter.class);
+ }
+
+ /**
+ * Create a user aggregator output format class
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user aggregator writer class
+ */
+ public static AggregatorWriter createAggregatorWriter(Configuration conf) {
+ Class<? extends AggregatorWriter> aggregatorWriterClass =
+ getAggregatorWriterClass(conf);
+ return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexCombiner}.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's vertex combiner class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, M extends Writable>
+ Class<? extends VertexCombiner<I, M>>
+ getVertexCombinerClass(Configuration conf) {
+ return (Class<? extends VertexCombiner<I, M>>)
+ conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
+ null,
+ VertexCombiner.class);
+ }
+
+ /**
+ * Create a user vertex combiner class
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, M extends Writable>
+ VertexCombiner<I, M> createVertexCombiner(Configuration conf) {
+ Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
+ getVertexCombinerClass(conf);
+ return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed VertexResolver.
+ *
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's vertex resolver class
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ Class<? extends VertexResolver<I, V, E, M>>
+ getVertexResolverClass(Configuration conf) {
+ return (Class<? extends VertexResolver<I, V, E, M>>)
+ conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
+ VertexResolver.class,
+ VertexResolver.class);
+ }
+
+ /**
+ * Create a user vertex revolver
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @param graphState State of the graph from the worker
+ * @return Instantiated user vertex resolver
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
+ createVertexResolver(Configuration conf,
+ GraphState<I, V, E, M> graphState) {
+ Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
+ getVertexResolverClass(conf);
+ VertexResolver<I, V, E, M> resolver =
+ ReflectionUtils.newInstance(vertexResolverClass, conf);
+ resolver.setGraphState(graphState);
+ return resolver;
+ }
+
+ /**
+ * Get the user's subclassed WorkerContext.
+ *
+ * @param conf Configuration to check
+ * @return User's worker context class
+ */
+ public static Class<? extends WorkerContext>
+ getWorkerContextClass(Configuration conf) {
+ return (Class<? extends WorkerContext>)
+ conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+ DefaultWorkerContext.class,
+ WorkerContext.class);
+ }
+
+ /**
+ * Create a user worker context
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @param graphState State of the graph from the worker
+ * @return Instantiated user worker context
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ WorkerContext createWorkerContext(Configuration conf,
+ GraphState<I, V, E, M> graphState) {
+ Class<? extends WorkerContext> workerContextClass =
+ getWorkerContextClass(conf);
+ WorkerContext workerContext =
+ ReflectionUtils.newInstance(workerContextClass, conf);
+ workerContext.setGraphState(graphState);
+ return workerContext;
+ }
+
+
+ /**
+ * Get the user's subclassed {@link BasicVertex}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's vertex class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ Class<? extends BasicVertex<I, V, E, M>> getVertexClass(Configuration conf) {
+ return (Class<? extends BasicVertex<I, V, E, M>>)
+ conf.getClass(GiraphJob.VERTEX_CLASS,
+ null,
+ BasicVertex.class);
+ }
+
+ /**
+ * Create a user vertex
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
+ createVertex(Configuration conf) {
+ Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+ BasicVertex<I, V, E, M> vertex =
+ ReflectionUtils.newInstance(vertexClass, conf);
+ return vertex;
+ }
+
+ /**
+ * Get the user's subclassed vertex index class.
+ *
+ * @param <I> Vertex id
+ * @param conf Configuration to check
+ * @return User's vertex index class
+ */
+ @SuppressWarnings("unchecked")
+ public static <I extends Writable> Class<I>
+ getVertexIndexClass(Configuration conf) {
+ return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
+ WritableComparable.class);
+ }
+
+ /**
+ * Create a user vertex index
+ *
+ * @param <I> Vertex id
+ * @param conf Configuration to check
+ * @return Instantiated user vertex index
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable>
+ I createVertexIndex(Configuration conf) {
+ Class<I> vertexClass = getVertexIndexClass(conf);
+ try {
+ return vertexClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createVertexIndex: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createVertexIndex: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex value class.
+ *
+ * @param <V> Vertex data
+ * @param conf Configuration to check
+ * @return User's vertex value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends Writable> Class<V>
+ getVertexValueClass(Configuration conf) {
+ return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
+ Writable.class);
+ }
+
+ /**
+ * Create a user vertex value
+ *
+ * @param <V> Vertex data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex value
+ */
+ public static <V extends Writable> V
+ createVertexValue(Configuration conf) {
+ Class<V> vertexValueClass = getVertexValueClass(conf);
+ try {
+ return vertexValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createVertexValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createVertexValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed edge value class.
+ *
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return User's vertex edge value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <E extends Writable> Class<E>
+ getEdgeValueClass(Configuration conf) {
+ return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
+ Writable.class);
+ }
+
+ /**
+ * Create a user edge value
+ *
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return Instantiated user edge value
+ */
+ public static <E extends Writable> E
+ createEdgeValue(Configuration conf) {
+ Class<E> edgeValueClass = getEdgeValueClass(conf);
+ try {
+ return edgeValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createEdgeValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createEdgeValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex message value class.
+ *
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return User's vertex message value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M>
+ getMessageValueClass(Configuration conf) {
+ return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
+ Writable.class);
+ }
+
+ /**
+ * Create a user vertex message value
+ *
+ * @param <M> Message data
+ * @param conf Configuration to check
+ * @return Instantiated user vertex message value
+ */
+ public static <M extends Writable> M
+ createMessageValue(Configuration conf) {
+ Class<M> messageValueClass = getMessageValueClass(conf);
+ try {
+ return messageValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ "createMessageValue: Illegally accessed", e);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java Thu Feb 16 22:12:31 2012
@@ -25,17 +25,17 @@ package org.apache.giraph.graph;
*/
public class DefaultWorkerContext extends WorkerContext {
- @Override
- public void preApplication() throws InstantiationException,
- IllegalAccessException {
- }
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ }
- @Override
- public void postApplication() { }
+ @Override
+ public void postApplication() { }
- @Override
- public void preSuperstep() { }
+ @Override
+ public void preSuperstep() { }
- @Override
- public void postSuperstep() { }
-}
\ No newline at end of file
+ @Override
+ public void postSuperstep() { }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Thu Feb 16 22:12:31 2012
@@ -36,133 +36,138 @@ import java.io.IOException;
*/
@SuppressWarnings("rawtypes")
public class Edge<I extends WritableComparable, E extends Writable>
- implements WritableComparable<Edge<I, E>>, Configurable {
- /** Destination vertex id */
- private I destVertexId = null;
- /** Edge value */
- private E edgeValue = null;
- /** Configuration - Used to instantiate classes */
- private Configuration conf = null;
-
- /**
- * Constructor for reflection
- */
- public Edge() {}
-
- /**
- * Create the edge with final values
- *
- * @param destVertexId
- * @param edgeValue
- */
- public Edge(I destVertexId, E edgeValue) {
- this.destVertexId = destVertexId;
- this.edgeValue = edgeValue;
- }
-
- /**
- * Get the destination vertex index of this edge
- *
- * @return Destination vertex index of this edge
- */
- public I getDestVertexId() {
- return destVertexId;
- }
-
- /**
- * Get the edge value of the edge
- *
- * @return Edge value of this edge
- */
- public E getEdgeValue() {
- return edgeValue;
- }
-
- /**
- * Set the destination vertex index of this edge.
- *
- * @param destVertexId new destination vertex
- */
- public void setDestVertexId(I destVertexId) {
- this.destVertexId = destVertexId;
- }
-
- /**
- * Set the value for this edge.
- *
- * @param edgeValue new edge value
- */
- public void setEdgeValue(E edgeValue) {
- this.edgeValue = edgeValue;
- }
-
- @Override
- public String toString() {
- return "(DestVertexIndex = " + destVertexId +
- ", edgeValue = " + edgeValue + ")";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void readFields(DataInput input) throws IOException {
- destVertexId = (I) BspUtils.createVertexIndex(getConf());
- destVertexId.readFields(input);
- edgeValue = (E) BspUtils.createEdgeValue(getConf());
- edgeValue.readFields(input);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- if (destVertexId == null) {
- throw new IllegalStateException(
- "write: Null destination vertex index");
- }
- if (edgeValue == null) {
- throw new IllegalStateException(
- "write: Null edge value");
- }
- destVertexId.write(output);
- edgeValue.write(output);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(Edge<I, E> edge) {
- return destVertexId.compareTo(edge.getDestVertexId());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) { return true; }
- if (o == null || getClass() != o.getClass()) { return false; }
-
- Edge edge = (Edge) o;
-
- if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
- edge.destVertexId != null) {
- return false;
- }
- if (edgeValue != null ? !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = destVertexId != null ? destVertexId.hashCode() : 0;
- result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
- return result;
- }
+ implements WritableComparable<Edge<I, E>>, Configurable {
+ /** Destination vertex id */
+ private I destVertexId = null;
+ /** Edge value */
+ private E edgeValue = null;
+ /** Configuration - Used to instantiate classes */
+ private Configuration conf = null;
+
+ /**
+ * Constructor for reflection
+ */
+ public Edge() { }
+
+ /**
+ * Create the edge with final values
+ *
+ * @param destVertexId Desination vertex id.
+ * @param edgeValue Value of the edge.
+ */
+ public Edge(I destVertexId, E edgeValue) {
+ this.destVertexId = destVertexId;
+ this.edgeValue = edgeValue;
+ }
+
+ /**
+ * Get the destination vertex index of this edge
+ *
+ * @return Destination vertex index of this edge
+ */
+ public I getDestVertexId() {
+ return destVertexId;
+ }
+
+ /**
+ * Get the edge value of the edge
+ *
+ * @return Edge value of this edge
+ */
+ public E getEdgeValue() {
+ return edgeValue;
+ }
+
+ /**
+ * Set the destination vertex index of this edge.
+ *
+ * @param destVertexId new destination vertex
+ */
+ public void setDestVertexId(I destVertexId) {
+ this.destVertexId = destVertexId;
+ }
+
+ /**
+ * Set the value for this edge.
+ *
+ * @param edgeValue new edge value
+ */
+ public void setEdgeValue(E edgeValue) {
+ this.edgeValue = edgeValue;
+ }
+
+ @Override
+ public String toString() {
+ return "(DestVertexIndex = " + destVertexId +
+ ", edgeValue = " + edgeValue + ")";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ destVertexId = (I) BspUtils.createVertexIndex(getConf());
+ destVertexId.readFields(input);
+ edgeValue = (E) BspUtils.createEdgeValue(getConf());
+ edgeValue.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (destVertexId == null) {
+ throw new IllegalStateException(
+ "write: Null destination vertex index");
+ }
+ if (edgeValue == null) {
+ throw new IllegalStateException(
+ "write: Null edge value");
+ }
+ destVertexId.write(output);
+ edgeValue.write(output);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(Edge<I, E> edge) {
+ return destVertexId.compareTo(edge.getDestVertexId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Edge edge = (Edge) o;
+
+ if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
+ edge.destVertexId != null) {
+ return false;
+ }
+ if (edgeValue != null ?
+ !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = destVertexId != null ? destVertexId.hashCode() : 0;
+ result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
+ return result;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Feb 16 22:12:31 2012
@@ -48,265 +48,266 @@ import java.util.Map;
*/
@SuppressWarnings("rawtypes")
public abstract class EdgeListVertex<I extends WritableComparable,
- V extends Writable,
- E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** List of the dest edge indices */
- private List<I> destEdgeIndexList;
- /** List of the dest edge values */
- /** Map of destination vertices and their edge values */
- private List<E> destEdgeValueList;
- /** List of incoming messages from the previous superstep */
- private List<M> msgList;
-
- @Override
- public void initialize(I vertexId, V vertexValue,
- Map<I, E> edges,
- Iterable<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- if (edges != null && !edges.isEmpty()) {
- destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
- destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
- List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
- Collections.sort(sortedIndexList, new VertexIdComparator());
- for (I index : sortedIndexList) {
- destEdgeIndexList.add(index);
- destEdgeValueList.add(edges.get(index));
- }
- sortedIndexList.clear();
- } else {
- destEdgeIndexList = Lists.newArrayListWithCapacity(0);
- destEdgeValueList = Lists.newArrayListWithCapacity(0);
- }
- if (messages != null) {
- msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
- Iterables.<M>addAll(msgList, messages);
- } else {
- msgList = Lists.newArrayListWithCapacity(0);
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof EdgeListVertex) {
- @SuppressWarnings("unchecked")
- EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
- if (!getVertexId().equals(otherVertex.getVertexId())) {
- return false;
- }
- if (!getVertexValue().equals(otherVertex.getVertexValue())) {
- return false;
- }
- if (!ComparisonUtils.equal(getMessages(),
- otherVertex.getMessages())) {
- return false;
- }
- return ComparisonUtils.equal(iterator(), otherVertex.iterator());
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ extends MutableVertex<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** List of the dest edge indices */
+ private List<I> destEdgeIndexList;
+ /** List of the dest edge values */
+ private List<E> destEdgeValueList;
+ /** List of incoming messages from the previous superstep */
+ private List<M> msgList;
+
+ @Override
+ public void initialize(I vertexId, V vertexValue,
+ Map<I, E> edges,
+ Iterable<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ if (edges != null && !edges.isEmpty()) {
+ destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
+ destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
+ List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
+ Collections.sort(sortedIndexList, new VertexIdComparator());
+ for (I index : sortedIndexList) {
+ destEdgeIndexList.add(index);
+ destEdgeValueList.add(edges.get(index));
+ }
+ sortedIndexList.clear();
+ } else {
+ destEdgeIndexList = Lists.newArrayListWithCapacity(0);
+ destEdgeValueList = Lists.newArrayListWithCapacity(0);
+ }
+ if (messages != null) {
+ msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
+ Iterables.<M>addAll(msgList, messages);
+ } else {
+ msgList = Lists.newArrayListWithCapacity(0);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode() * 37 + vertexValue.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof EdgeListVertex) {
+ @SuppressWarnings("unchecked")
+ EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
+ if (!getVertexId().equals(otherVertex.getVertexId())) {
return false;
+ }
+ if (!getVertexValue().equals(otherVertex.getVertexValue())) {
+ return false;
+ }
+ if (!ComparisonUtils.equal(getMessages(),
+ otherVertex.getMessages())) {
+ return false;
+ }
+ return ComparisonUtils.equal(iterator(), otherVertex.iterator());
}
+ return false;
+ }
- /**
- * Comparator for the vertex id
- */
- private class VertexIdComparator implements Comparator<I> {
- @SuppressWarnings("unchecked")
- @Override
- public int compare(I index1, I index2) {
- return index1.compareTo(index2);
- }
- }
-
- @Override
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- System.out.println("addEdge: " + targetVertexId + " " + edgeValue + " " + destEdgeIndexList);
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
- destEdgeValueList.add(-1 * (pos + 1), edgeValue);
- return true;
- } else {
- LOG.warn("addEdge: Vertex=" + vertexId +
- ": already added an edge value for dest vertex id " +
- targetVertexId);
- return false;
- }
- }
-
- @Override
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- @Override
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- }
-
- @Override
- public final I getVertexId() {
- return vertexId;
- }
-
- @Override
- public final V getVertexValue() {
- return vertexValue;
- }
-
- @Override
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- }
-
- @Override
- public E getEdgeValue(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return null;
- } else {
- return destEdgeValueList.get(pos);
- }
- }
-
- @Override
- public boolean hasEdge(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return false;
- } else {
- return true;
- }
- }
-
- /**
- * Get an iterator to the edges on this vertex.
- *
- * @return A <em>sorted</em> iterator, as defined by the sort-order
- * of the vertex ids
- */
- @Override
- public Iterator<I> iterator() {
- return destEdgeIndexList.iterator();
- }
-
- @Override
- public int getNumOutEdges() {
- return destEdgeIndexList.size();
- }
-
- @Override
- public E removeEdge(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return null;
- } else {
- destEdgeIndexList.remove(pos);
- return destEdgeValueList.remove(pos);
- }
- }
-
- @Override
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (I index : destEdgeIndexList) {
- sendMsg(index, msg);
- }
- }
-
- @Override
- final public void readFields(DataInput in) throws IOException {
- vertexId = BspUtils.<I>createVertexIndex(getConf());
- vertexId.readFields(in);
- boolean hasVertexValue = in.readBoolean();
- if (hasVertexValue) {
- vertexValue = BspUtils.<V>createVertexValue(getConf());
- vertexValue.readFields(in);
- }
- int edgeListCount = in.readInt();
- destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
- destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
- for (int i = 0; i < edgeListCount; ++i) {
- I vertexId = BspUtils.<I>createVertexIndex(getConf());
- E edgeValue = BspUtils.<E>createEdgeValue(getConf());
- vertexId.readFields(in);
- edgeValue.readFields(in);
- destEdgeIndexList.add(vertexId);
- destEdgeValueList.add(edgeValue);
- }
- int msgListSize = in.readInt();
- msgList = Lists.newArrayListWithCapacity(msgListSize);
- for (int i = 0; i < msgListSize; ++i) {
- M msg = BspUtils.<M>createMessageValue(getConf());
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- }
-
- @Override
- final public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- out.writeInt(destEdgeIndexList.size());
- for (int i = 0 ; i < destEdgeIndexList.size(); ++i) {
- destEdgeIndexList.get(i).write(out);
- destEdgeValueList.get(i).write(out);
- }
- out.writeInt(msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
-
- @Override
- void putMessages(Iterable<M> messages) {
- msgList.clear();
- for (M message : messages) {
- msgList.add(message);
- }
- }
-
- @Override
- public Iterable<M> getMessages() {
- return Iterables.unmodifiableIterable(msgList);
- }
-
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- msgList.clear();
- }
-
- @Override
- public String toString() {
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
- ",#edges=" + getNumOutEdges() + ")";
- }
+ /**
+ * Comparator for the vertex id
+ */
+ private class VertexIdComparator implements Comparator<I> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(I index1, I index2) {
+ return index1.compareTo(index2);
+ }
+ }
+
+ @Override
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos < 0) {
+ destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
+ destEdgeValueList.add(-1 * (pos + 1), edgeValue);
+ return true;
+ } else {
+ LOG.warn("addEdge: Vertex=" + vertexId +
+ ": already added an edge value for dest vertex id " +
+ targetVertexId);
+ return false;
+ }
+ }
+
+ @Override
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ @Override
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ @Override
+ public final I getVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public final V getVertexValue() {
+ return vertexValue;
+ }
+
+ @Override
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ }
+
+ @Override
+ public E getEdgeValue(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos < 0) {
+ return null;
+ } else {
+ return destEdgeValueList.get(pos);
+ }
+ }
+
+ @Override
+ public boolean hasEdge(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos < 0) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Get an iterator to the edges on this vertex.
+ *
+ * @return A <em>sorted</em> iterator, as defined by the sort-order
+ * of the vertex ids
+ */
+ @Override
+ public Iterator<I> iterator() {
+ return destEdgeIndexList.iterator();
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return destEdgeIndexList.size();
+ }
+
+ @Override
+ public E removeEdge(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos < 0) {
+ return null;
+ } else {
+ destEdgeIndexList.remove(pos);
+ return destEdgeValueList.remove(pos);
+ }
+ }
+
+ @Override
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (I index : destEdgeIndexList) {
+ sendMsg(index, msg);
+ }
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ vertexId = BspUtils.<I>createVertexIndex(getConf());
+ vertexId.readFields(in);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = BspUtils.<V>createVertexValue(getConf());
+ vertexValue.readFields(in);
+ }
+ int edgeListCount = in.readInt();
+ destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
+ destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
+ for (int i = 0; i < edgeListCount; ++i) {
+ I destVertexId = BspUtils.<I>createVertexIndex(getConf());
+ E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+ destVertexId.readFields(in);
+ edgeValue.readFields(in);
+ destEdgeIndexList.add(destVertexId);
+ destEdgeValueList.add(edgeValue);
+ }
+ int msgListSize = in.readInt();
+ msgList = Lists.newArrayListWithCapacity(msgListSize);
+ for (int i = 0; i < msgListSize; ++i) {
+ M msg = BspUtils.<M>createMessageValue(getConf());
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ out.writeInt(destEdgeIndexList.size());
+ for (int i = 0; i < destEdgeIndexList.size(); ++i) {
+ destEdgeIndexList.get(i).write(out);
+ destEdgeValueList.get(i).write(out);
+ }
+ out.writeInt(msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
+
+ @Override
+ void putMessages(Iterable<M> messages) {
+ msgList.clear();
+ for (M message : messages) {
+ msgList.add(message);
+ }
+ }
+
+ @Override
+ public Iterable<M> getMessages() {
+ return Iterables.unmodifiableIterable(msgList);
+ }
+
+ @Override
+ void releaseResources() {
+ // Hint to GC to free the messages
+ msgList.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+ ",#edges=" + getNumOutEdges() + ")";
+ }
}