You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:06 UTC
[10/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 10e4975..621bb14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -36,11 +36,9 @@ import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.partition.DefaultPartitionContext;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.SimplePartition;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.DefaultWorkerContext;
@@ -57,22 +55,25 @@ import java.util.List;
* @param <I> Vertex ID class
* @param <V> Vertex Value class
* @param <E> Edge class
- * @param <M> Message class
*/
@SuppressWarnings("unchecked")
public class GiraphClasses<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
implements GiraphConstants {
- /** Vertex class - cached for fast access */
- protected Class<? extends Vertex<I, V, E, M>> vertexClass;
+ /** Computation class - cached for fast access */
+ protected Class<? extends
+ Computation<I, V, E, ? extends Writable, ? extends Writable>>
+ computationClass;
/** Vertex id class - cached for fast access */
protected Class<I> vertexIdClass;
/** Vertex value class - cached for fast access */
protected Class<V> vertexValueClass;
/** Edge value class - cached for fast access */
protected Class<E> edgeValueClass;
- /** Message value class - cached for fast access */
- protected Class<M> messageValueClass;
+ /** Incoming message value class - cached for fast access */
+ protected Class<? extends Writable> incomingMessageValueClass;
+ /** Outgoing message value class - cached for fast access */
+ protected Class<? extends Writable> outgoingMessageValueClass;
/** Vertex edges class - cached for fast access */
protected Class<? extends OutEdges<I, E>> outEdgesClass;
/** Input vertex edges class - cached for fast access */
@@ -82,7 +83,7 @@ public class GiraphClasses<I extends WritableComparable,
protected Class<? extends VertexValueFactory<V>> vertexValueFactoryClass;
/** Graph partitioner factory class - cached for fast access */
- protected Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ protected Class<? extends GraphPartitionerFactory<I, V, E>>
graphPartitionerFactoryClass;
/** Vertex input format class - cached for fast access */
@@ -98,24 +99,22 @@ public class GiraphClasses<I extends WritableComparable,
/** Aggregator writer class - cached for fast access */
protected Class<? extends AggregatorWriter> aggregatorWriterClass;
/** Combiner class - cached for fast access */
- protected Class<? extends Combiner<I, M>> combinerClass;
+ protected Class<? extends Combiner<I, ? extends Writable>> combinerClass;
/** Vertex resolver class - cached for fast access */
- protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass;
- /** Partition context class - cached for fast access */
- protected Class<? extends PartitionContext> partitionContextClass;
+ protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
/** Worker context class - cached for fast access */
protected Class<? extends WorkerContext> workerContextClass;
/** Master compute class - cached for fast access */
protected Class<? extends MasterCompute> masterComputeClass;
/** Partition class - cached for fast accesss */
- protected Class<? extends Partition<I, V, E, M>> partitionClass;
+ protected Class<? extends Partition<I, V, E>> partitionClass;
/** Edge Input Filter class */
protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
/** Vertex Input Filter class */
- protected Class<? extends VertexInputFilter<I, V, E, M>>
+ protected Class<? extends VertexInputFilter<I, V, E>>
vertexInputFilterClass;
/**
@@ -131,19 +130,18 @@ public class GiraphClasses<I extends WritableComparable,
vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>) (Object)
DefaultVertexValueFactory.class;
graphPartitionerFactoryClass =
- (Class<? extends GraphPartitionerFactory<I, V, E, M>>) (Object)
+ (Class<? extends GraphPartitionerFactory<I, V, E>>) (Object)
HashPartitionerFactory.class;
aggregatorWriterClass = TextAggregatorWriter.class;
- vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+ vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
(Object) DefaultVertexResolver.class;
- partitionContextClass = DefaultPartitionContext.class;
workerContextClass = DefaultWorkerContext.class;
masterComputeClass = DefaultMasterCompute.class;
- partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
+ partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
SimplePartition.class;
edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
(Object) DefaultEdgeInputFilter.class;
- vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+ vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
(Object) DefaultVertexInputFilter.class;
}
@@ -163,13 +161,17 @@ public class GiraphClasses<I extends WritableComparable,
*/
private void readFromConf(Configuration conf) {
// set pre-validated generic parameter types into Configuration
- vertexClass = (Class<? extends Vertex<I, V, E, M>>) VERTEX_CLASS.get(conf);
- List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
- vertexClass);
+ computationClass =
+ (Class<? extends
+ Computation<I, V, E, ? extends Writable, ? extends Writable>>)
+ COMPUTATION_CLASS.get(conf);
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(Computation.class, computationClass);
vertexIdClass = (Class<I>) classList.get(0);
vertexValueClass = (Class<V>) classList.get(1);
edgeValueClass = (Class<E>) classList.get(2);
- messageValueClass = (Class<M>) classList.get(3);
+ incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
+ outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
outEdgesClass = (Class<? extends OutEdges<I, E>>)
VERTEX_EDGES_CLASS.get(conf);
@@ -179,7 +181,7 @@ public class GiraphClasses<I extends WritableComparable,
VERTEX_VALUE_FACTORY_CLASS.get(conf);
graphPartitionerFactoryClass =
- (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+ (Class<? extends GraphPartitionerFactory<I, V, E>>)
GRAPH_PARTITIONER_FACTORY_CLASS.get(conf);
vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E>>)
@@ -190,29 +192,30 @@ public class GiraphClasses<I extends WritableComparable,
EDGE_INPUT_FORMAT_CLASS.get(conf);
aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
- combinerClass = (Class<? extends Combiner<I, M>>)
+ combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
VERTEX_COMBINER_CLASS.get(conf);
- vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+ vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
VERTEX_RESOLVER_CLASS.get(conf);
- partitionContextClass = PARTITION_CONTEXT_CLASS.get(conf);
workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
masterComputeClass = MASTER_COMPUTE_CLASS.get(conf);
- partitionClass = (Class<? extends Partition<I, V, E, M>>)
+ partitionClass = (Class<? extends Partition<I, V, E>>)
PARTITION_CLASS.get(conf);
edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
EDGE_INPUT_FILTER_CLASS.get(conf);
- vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+ vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
VERTEX_INPUT_FILTER_CLASS.get(conf);
}
/**
- * Get Vertex class
+ * Get Computation class
*
- * @return Vertex class.
+ * @return Computation class.
*/
- public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
- return vertexClass;
+ public Class<? extends
+ Computation<I, V, E, ? extends Writable, ? extends Writable>>
+ getComputationClass() {
+ return computationClass;
}
/**
@@ -243,12 +246,23 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Get Message Value class
+ * Get incoming Message Value class - messages which have been sent in the
+ * previous superstep and are processed in the current one
*
* @return Message Value class
*/
- public Class<M> getMessageValueClass() {
- return messageValueClass;
+ public Class<? extends Writable> getIncomingMessageValueClass() {
+ return incomingMessageValueClass;
+ }
+
+ /**
+ * Get outgoing Message Value class - messages which are going to be sent
+ * during current superstep
+ *
+ * @return Message Value class
+ */
+ public Class<? extends Writable> getOutgoingMessageValueClass() {
+ return outgoingMessageValueClass;
}
/**
@@ -283,7 +297,7 @@ public class GiraphClasses<I extends WritableComparable,
*
* @return GraphPartitionerFactory
*/
- public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ public Class<? extends GraphPartitionerFactory<I, V, E>>
getGraphPartitionerFactoryClass() {
return graphPartitionerFactoryClass;
}
@@ -293,7 +307,7 @@ public class GiraphClasses<I extends WritableComparable,
return edgeInputFilterClass;
}
- public Class<? extends VertexInputFilter<I, V, E, M>>
+ public Class<? extends VertexInputFilter<I, V, E>>
getVertexInputFilterClass() {
return vertexInputFilterClass;
}
@@ -386,7 +400,7 @@ public class GiraphClasses<I extends WritableComparable,
*
* @return Combiner
*/
- public Class<? extends Combiner<I, M>> getCombinerClass() {
+ public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
return combinerClass;
}
@@ -404,29 +418,11 @@ public class GiraphClasses<I extends WritableComparable,
*
* @return VertexResolver
*/
- public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+ public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
return vertexResolverClass;
}
/**
- * Check if PartitionContext is set
- *
- * @return true if PartitionContext is set
- */
- public boolean hasPartitionContextClass() {
- return partitionContextClass != null;
- }
-
- /**
- * Get PartitionContext used
- *
- * @return PartitionContext
- */
- public Class<? extends PartitionContext> getPartitionContextClass() {
- return partitionContextClass;
- }
-
- /**
* Check if WorkerContext is set
*
* @return true if WorkerContext is set
@@ -476,19 +472,24 @@ public class GiraphClasses<I extends WritableComparable,
*
* @return Partition
*/
- public Class<? extends Partition<I, V, E, M>> getPartitionClass() {
+ public Class<? extends Partition<I, V, E>> getPartitionClass() {
return partitionClass;
}
/**
- * Set Vertex class held
+ * Set Computation class held, and update message types
*
- * @param vertexClass Vertex class to set
+ * @param computationClass Computation class to set
* @return this
*/
- public GiraphClasses setVertexClass(
- Class<? extends Vertex<I, V, E, M>> vertexClass) {
- this.vertexClass = vertexClass;
+ public GiraphClasses setComputationClass(Class<? extends
+ Computation<I, V, E, ? extends Writable, ? extends Writable>>
+ computationClass) {
+ this.computationClass = computationClass;
+ List<Class<?>> classList =
+ ReflectionUtils.getTypeArguments(Computation.class, computationClass);
+ incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
+ outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
return this;
}
@@ -526,13 +527,28 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Set Message Value class held
+ * Set incoming Message Value class held - messages which have been sent in
+ * the previous superstep and are processed in the current one
+ *
+ * @param incomingMessageValueClass Message Value class to set
+ * @return this
+ */
+ public GiraphClasses setIncomingMessageValueClass(
+ Class<? extends Writable> incomingMessageValueClass) {
+ this.incomingMessageValueClass = incomingMessageValueClass;
+ return this;
+ }
+
+ /**
+ * Set outgoing Message Value class held - messages which are going to be sent
+ * during current superstep
*
- * @param messageValueClass Message Value class to set
+ * @param outgoingMessageValueClass Message Value class to set
* @return this
*/
- public GiraphClasses setMessageValueClass(Class<M> messageValueClass) {
- this.messageValueClass = messageValueClass;
+ public GiraphClasses setOutgoingMessageValueClass(
+ Class<? extends Writable> outgoingMessageValueClass) {
+ this.outgoingMessageValueClass = outgoingMessageValueClass;
return this;
}
@@ -583,7 +599,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setGraphPartitionerFactoryClass(
- Class<? extends GraphPartitionerFactory<I, V, E, M>> klass) {
+ Class<? extends GraphPartitionerFactory<I, V, E>> klass) {
this.graphPartitionerFactoryClass = klass;
return this;
}
@@ -643,7 +659,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setCombinerClass(
- Class<? extends Combiner<I, M>> combinerClass) {
+ Class<? extends Combiner<I, ? extends Writable>> combinerClass) {
this.combinerClass = combinerClass;
return this;
}
@@ -655,24 +671,12 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setVertexResolverClass(
- Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass) {
+ Class<? extends VertexResolver<I, V, E>> vertexResolverClass) {
this.vertexResolverClass = vertexResolverClass;
return this;
}
/**
- * Set PartitionContext used
- *
- * @param partitionContextClass PartitionContext class to set
- * @return this
- */
- public GiraphClasses setPartitionContextClass(
- Class<? extends PartitionContext> partitionContextClass) {
- this.partitionContextClass = partitionContextClass;
- return this;
- }
-
- /**
* Set WorkerContext used
*
* @param workerContextClass WorkerContext class to set
@@ -703,7 +707,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setPartitionClass(
- Class<? extends Partition<I, V, E, M>> partitionClass) {
+ Class<? extends Partition<I, V, E>> partitionClass) {
this.partitionClass = partitionClass;
return this;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 754fad9..58a3f01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,9 +20,9 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReuseObjectsOutEdges;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
@@ -35,7 +35,6 @@ import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.ReusesObjectsPartition;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
@@ -79,22 +78,22 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+ * Get the user's subclassed {@link Computation}
*
- * @return User's vertex class
+ * @return User's computation class
*/
- public Class<? extends Vertex> getVertexClass() {
- return VERTEX_CLASS.get(this);
+ public Class<? extends Computation> getComputationClass() {
+ return COMPUTATION_CLASS.get(this);
}
/**
- * Set the vertex class (required)
+ * Set the computation class (required)
*
- * @param vertexClass Runs vertex computation
+ * @param computationClass Runs vertex computation
*/
- public final void setVertexClass(
- Class<? extends Vertex> vertexClass) {
- VERTEX_CLASS.set(this, vertexClass);
+ public void setComputationClass(
+ Class<? extends Computation> computationClass) {
+ COMPUTATION_CLASS.set(this, computationClass);
}
/**
@@ -407,7 +406,7 @@ public class GiraphConfiguration extends Configuration
*
* @param vertexCombinerClass Determines how vertex messages are combined
*/
- public final void setCombinerClass(
+ public void setCombinerClass(
Class<? extends Combiner> vertexCombinerClass) {
VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
}
@@ -452,17 +451,6 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Set the partition context class (optional)
- *
- * @param partitionContextClass Determines what code is executed for each
- * partition before and after each superstep
- */
- public final void setPartitionContextClass(
- Class<? extends PartitionContext> partitionContextClass) {
- PARTITION_CONTEXT_CLASS.set(this, partitionContextClass);
- }
-
- /**
* Set the worker context class (optional)
*
* @param workerContextClass Determines what code is executed on a each
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index bbf50e5..2d0f59c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
@@ -39,11 +39,9 @@ import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.partition.DefaultPartitionContext;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.SimplePartition;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.worker.WorkerContext;
@@ -62,9 +60,10 @@ public interface GiraphConstants {
/** 1KB in bytes */
int ONE_KB = 1024;
- /** Vertex class - required */
- ClassConfOption<Vertex> VERTEX_CLASS =
- ClassConfOption.create("giraph.vertexClass", null, Vertex.class);
+ /** Computation class - required */
+ ClassConfOption<Computation> COMPUTATION_CLASS =
+ ClassConfOption.create("giraph.computationClass", null,
+ Computation.class);
/** Vertex value factory class - optional */
ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
ClassConfOption.create("giraph.vertexValueFactoryClass",
@@ -184,13 +183,14 @@ public interface GiraphConstants {
/** Edge value class */
ClassConfOption<Writable> EDGE_VALUE_CLASS =
ClassConfOption.create("giraph.edgeValueClass", null, Writable.class);
- /** Message value class */
- ClassConfOption<Writable> MESSAGE_VALUE_CLASS =
- ClassConfOption.create("giraph.messageValueClass", null, Writable.class);
- /** Partition context class */
- ClassConfOption<PartitionContext> PARTITION_CONTEXT_CLASS =
- ClassConfOption.create("giraph.partitionContextClass",
- DefaultPartitionContext.class, PartitionContext.class);
+ /** Incoming message value class */
+ ClassConfOption<Writable> INCOMING_MESSAGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.incomingMessageValueClass", null,
+ Writable.class);
+ /** Outgoing message value class */
+ ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS =
+ ClassConfOption.create("giraph.outgoingMessageValueClass", null,
+ Writable.class);
/** Worker context class */
ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
ClassConfOption.create("giraph.workerContextClass",
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
index e4351a2..dbffbc7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
@@ -27,23 +27,20 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
public interface ImmutableClassesGiraphConfigurable<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable> {
+ I extends WritableComparable, V extends Writable, E extends Writable> {
/**
* Set the configuration to be used by this object.
*
* @param configuration Set configuration
*/
- void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M>
- configuration);
+ void setConf(ImmutableClassesGiraphConfiguration<I, V, E> configuration);
/**
* Return the configuration used by this object.
*
* @return Set configuration
*/
- ImmutableClassesGiraphConfiguration<I, V, E, M> getConf();
+ ImmutableClassesGiraphConfiguration<I, V, E> getConf();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index a9add4f..aa52498 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReusableEdge;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
@@ -43,9 +43,9 @@ import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.master.SuperstepClasses;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
@@ -70,11 +70,10 @@ import org.apache.hadoop.util.Progressable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("unchecked")
public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
extends GiraphConfiguration {
/** Holder for all the classes */
private final GiraphClasses classes;
@@ -95,7 +94,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*/
public ImmutableClassesGiraphConfiguration(Configuration conf) {
super(conf);
- classes = new GiraphClasses<I, V, E, M>(conf);
+ classes = new GiraphClasses<I, V, E>(conf);
useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
try {
vertexValueFactory = (VertexValueFactory<V>)
@@ -113,23 +112,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Create a new ImmutableClassesGiraphConfiguration. This is a convenience
- * method to make it easier to deal with generics.
- *
- * @param conf Configuration to read
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- * @param <M> Message Value
- * @return new ImmutableClassesGiraphConfiguration
- */
- public static <I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- ImmutableClassesGiraphConfiguration<I, V, E, M> create(Configuration conf) {
- return new ImmutableClassesGiraphConfiguration<I, V, E, M>(conf);
- }
-
- /**
* Configure an object with this instance if the object is configurable.
* @param obj Object
*/
@@ -162,7 +144,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return VertexInputFilter class
*/
- public Class<? extends VertexInputFilter<I, V, E, M>>
+ public Class<? extends VertexInputFilter<I, V, E>>
getVertexInputFilterClass() {
return classes.getVertexInputFilterClass();
}
@@ -181,7 +163,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return User's graph partitioner
*/
- public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ public Class<? extends GraphPartitionerFactory<I, V, E>>
getGraphPartitionerClass() {
return classes.getGraphPartitionerFactoryClass();
}
@@ -191,8 +173,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return Instantiated user graph partitioner class
*/
- public GraphPartitionerFactory<I, V, E, M> createGraphPartitioner() {
- Class<? extends GraphPartitionerFactory<I, V, E, M>> klass =
+ public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
+ Class<? extends GraphPartitionerFactory<I, V, E>> klass =
classes.getGraphPartitionerFactoryClass();
return ReflectionUtils.newInstance(klass, this);
}
@@ -380,17 +362,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return User's combiner class
*/
- public Class<? extends Combiner<I, M>> getCombinerClass() {
+ public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
return classes.getCombinerClass();
}
/**
* Create a user combiner class
*
+ * @param <M> Message data
* @return Instantiated user combiner class
*/
@SuppressWarnings("rawtypes")
- public Combiner<I, M> createCombiner() {
+ public <M extends Writable> Combiner<I, M> createCombiner() {
Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
return ReflectionUtils.newInstance(klass, this);
}
@@ -409,41 +392,17 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return User's vertex resolver class
*/
- public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+ public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
return classes.getVertexResolverClass();
}
/**
* Create a user vertex revolver
*
- * @param graphState State of the graph from the worker
* @return Instantiated user vertex resolver
*/
- @SuppressWarnings("rawtypes")
- public VertexResolver<I, V, E, M> createVertexResolver(
- GraphState<I, V, E, M> graphState) {
- VertexResolver<I, V, E, M> resolver =
- ReflectionUtils.newInstance(getVertexResolverClass(), this);
- resolver.setGraphState(graphState);
- return resolver;
- }
-
- /**
- * Get the user's subclassed PartitionContext.
- *
- * @return User's partition context class
- */
- public Class<? extends PartitionContext> getPartitionContextClass() {
- return classes.getPartitionContextClass();
- }
-
- /**
- * Create a user partition context
- *
- * @return Instantiated user partition context
- */
- public PartitionContext createPartitionContext() {
- return ReflectionUtils.newInstance(getPartitionContextClass(), this);
+ public VertexResolver<I, V, E> createVertexResolver() {
+ return ReflectionUtils.newInstance(getVertexResolverClass(), this);
}
/**
@@ -458,15 +417,10 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
/**
* Create a user worker context
*
- * @param graphState State of the graph from the worker
* @return Instantiated user worker context
*/
- @SuppressWarnings("rawtypes")
- public WorkerContext createWorkerContext(GraphState<I, V, E, M> graphState) {
- WorkerContext workerContext =
- ReflectionUtils.newInstance(getWorkerContextClass(), this);
- workerContext.setGraphState(graphState);
- return workerContext;
+ public WorkerContext createWorkerContext() {
+ return ReflectionUtils.newInstance(getWorkerContextClass(), this);
}
/**
@@ -488,17 +442,29 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
@Override
- public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
- return classes.getVertexClass();
+ public Class<? extends
+ Computation<I, V, E, ? extends Writable, ? extends Writable>>
+ getComputationClass() {
+ return classes.getComputationClass();
}
/**
- * Create a user vertex
+ * Create a user computation
*
- * @return Instantiated user vertex
+ * @return Instantiated user computation
*/
- public Vertex<I, V, E, M> createVertex() {
- return ReflectionUtils.newInstance(getVertexClass(), this);
+ public Computation<I, V, E, ? extends Writable, ? extends Writable>
+ createComputation() {
+ return ReflectionUtils.newInstance(getComputationClass(), this);
+ }
+
+ /**
+ * Create a vertex
+ *
+ * @return Instantiated vertex
+ */
+ public Vertex<I, V, E> createVertex() {
+ return ReflectionUtils.newInstance(Vertex.class, this);
}
/**
@@ -658,34 +624,58 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Get the user's subclassed vertex message value class.
+ * Get the user's subclassed incoming message value class.
*
+ * @param <M> Message data
* @return User's vertex message value class
*/
- @SuppressWarnings("unchecked")
- public Class<M> getMessageValueClass() {
- return classes.getMessageValueClass();
+ public <M extends Writable> Class<M> getIncomingMessageValueClass() {
+ return classes.getIncomingMessageValueClass();
}
/**
- * Create a user vertex message value
+ * Get the user's subclassed outgoing message value class.
*
- * @return Instantiated user vertex message value
+ * @param <M> Message data
+ * @return User's vertex message value class
*/
- public M createMessageValue() {
- Class<M> klass = getMessageValueClass();
- if (klass == NullWritable.class) {
+ public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
+ return classes.getOutgoingMessageValueClass();
+ }
+
+ /**
+ * Create incoming message value
+ *
+ * @param <M> Message data
+ * @return Incoming message value
+ */
+ public <M extends Writable> M createIncomingMessageValue() {
+ return this.<M>createMessageValue(this.<M>getIncomingMessageValueClass());
+ }
+
+ /**
+ * Create outgoing message value
+ *
+ * @param <M> Message data
+ * @return Outgoing message value
+ */
+ public <M extends Writable> M createOutgoingMessageValue() {
+ return this.<M>createMessageValue(this.<M>getOutgoingMessageValueClass());
+ }
+
+ /**
+ * Create a message value
+ *
+ * @param <M> Message data
+ * @param messageClass Message class
+ * @return Instantiated message value
+ */
+ private <M extends Writable> M createMessageValue(
+ Class<? extends Writable> messageClass) {
+ if (messageClass == NullWritable.class) {
return (M) NullWritable.get();
} else {
- try {
- return klass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createMessageValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createMessageValue: Illegally accessed", e);
- }
+ return (M) ReflectionUtils.newInstance(messageClass);
}
}
@@ -792,10 +782,10 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
* @param progressable Progressable for reporting progress
* @return Instantiated partition
*/
- public Partition<I, V, E, M> createPartition(
+ public Partition<I, V, E> createPartition(
int id, Progressable progressable) {
- Class<? extends Partition<I, V, E, M>> klass = classes.getPartitionClass();
- Partition<I, V, E, M> partition = ReflectionUtils.newInstance(klass, this);
+ Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
+ Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
partition.initialize(id, progressable);
return partition;
}
@@ -868,4 +858,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
return new ExtendedByteArrayDataInput(buf, off, length);
}
}
+
+ /**
+ * Update Computation and Combiner class used
+ *
+ * @param superstepClasses SuperstepClasses
+ */
+ public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
+ classes.setComputationClass(superstepClasses.getComputationClass());
+ classes.setCombinerClass(superstepClasses.getCombinerClass());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
index 631b209..a983ee4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
@@ -31,6 +31,6 @@ import org.apache.hadoop.io.WritableComparable;
*/
public abstract class ConfigurableOutEdges<I extends WritableComparable,
E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E,
- Writable> implements OutEdges<I, E> {
+ extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E>
+ implements OutEdges<I, E> {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 9e2d246..420bf93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -41,16 +41,15 @@ import org.apache.log4j.Logger;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class EdgeStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(EdgeStore.class);
/** Service worker. */
- private CentralizedServiceWorker<I, V, E, M> service;
+ private CentralizedServiceWorker<I, V, E> service;
/** Giraph configuration. */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Progressable to report progress. */
private Progressable progressable;
/** Map used to temporarily store incoming edges. */
@@ -75,8 +74,8 @@ public class EdgeStore<I extends WritableComparable,
* @param progressable Progressable
*/
public EdgeStore(
- CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
Progressable progressable) {
this.service = service;
this.configuration = configuration;
@@ -179,14 +178,14 @@ public class EdgeStore<I extends WritableComparable,
public Void call() throws Exception {
Integer partitionId;
while ((partitionId = partitionIdQueue.poll()) != null) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
service.getPartitionStore().getPartition(partitionId);
ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
transientEdges.remove(partitionId);
for (I vertexId : partitionEdges.keySet()) {
OutEdges<I, E> outEdges = convertInputToComputeEdges(
partitionEdges.remove(vertexId));
- Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+ Vertex<I, V, E> vertex = partition.getVertex(vertexId);
// If the source vertex doesn't exist, create it. Otherwise,
// just set the edges.
if (vertex == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
index 82486f4..ca32a9f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
@@ -34,14 +34,14 @@ import java.util.Iterator;
public class MutableEdgesIterable<I extends WritableComparable,
E extends Writable> implements Iterable<MutableEdge<I, E>> {
/** Vertex that owns the out-edges. */
- private Vertex<I, ?, E, ?> vertex;
+ private Vertex<I, ?, E> vertex;
/**
* Constructor.
*
* @param vertex Owning vertex
*/
- public MutableEdgesIterable(Vertex<I, ?, E, ?> vertex) {
+ public MutableEdgesIterable(Vertex<I, ?, E> vertex) {
this.vertex = vertex;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
index cd845d0..529234d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
@@ -73,7 +73,7 @@ public class MutableEdgesWrapper<I extends WritableComparable,
public static <I extends WritableComparable, E extends Writable>
MutableEdgesWrapper<I, E> wrap(
OutEdges<I, E> edges,
- ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
+ ImmutableClassesGiraphConfiguration<I, ?, E> conf) {
MutableEdgesWrapper<I, E> wrapper = new MutableEdgesWrapper<I, E>(
edges, conf.createAndInitializeOutEdges(edges.size()));
return wrapper;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
new file mode 100644
index 0000000..180c5d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Computation in which both incoming and outgoing message types are the same.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message type
+ */
+public abstract class BasicComputation<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends Computation<I, V, E, M, M> {
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
new file mode 100644
index 0000000..84158df
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+/**
+ * Basic abstract class for writing a BSP application for computation.
+ *
+ * During the superstep there can be several instances of this class,
+ * each doing computation on one partition of the graph's vertices.
+ *
+ * Note that each thread will have its own {@link Computation},
+ * so accessing any data from this class is thread-safe.
+ * However, accessing global data (like data from {@link WorkerContext})
+ * is not thread-safe.
+ *
+ * Objects of this class only live for a single superstep.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M1> Incoming message type
+ * @param <M2> Outgoing message type
+ */
+public abstract class Computation<I extends WritableComparable,
+ V extends Writable, E extends Writable, M1 extends Writable,
+ M2 extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements WorkerAggregatorUsage {
+ /** Global graph state **/
+ private GraphState graphState;
+ /** Handles requests */
+ private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
+ /** Graph-wide BSP Mapper for this Computation */
+ private GraphTaskManager<I, V, E> graphTaskManager;
+ /** Worker aggregator usage */
+ private WorkerAggregatorUsage workerAggregatorUsage;
+ /** Worker context */
+ private WorkerContext workerContext;
+
+ /**
+ * Must be defined by user to do computation on a single Vertex.
+ *
+ * @param vertex Vertex
+ * @param messages Messages that were sent to this vertex in the previous
+ * superstep. Each message is only guaranteed to have
+ * a life expectancy as long as next() is not called.
+ */
+ public abstract void compute(Vertex<I, V, E> vertex,
+ Iterable<M1> messages) throws IOException;
+
+ /**
+ * Prepare for computation. This method is executed exactly once prior to
+ * {@link #compute(Vertex, Iterable)} being called for any of the vertices
+ * in the partition.
+ */
+ public void preSuperstep() {
+ }
+
+ /**
+ * Finish computation. This method is executed exactly once after computation
+ * for all vertices in the partition is complete.
+ */
+ public void postSuperstep() {
+ }
+
+ /**
+ * Initialize, called by infrastructure before the superstep starts.
+ * Shouldn't be called by user code.
+ *
+ * @param graphState Graph state
+ * @param workerClientRequestProcessor Processor for handling requests
+ * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
+ * @param workerAggregatorUsage Worker aggregator usage
+ * @param workerContext Worker context
+ */
+ public final void initialize(
+ GraphState graphState,
+ WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
+ GraphTaskManager<I, V, E> graphTaskManager,
+ WorkerAggregatorUsage workerAggregatorUsage,
+ WorkerContext workerContext) {
+ this.graphState = graphState;
+ this.workerClientRequestProcessor = workerClientRequestProcessor;
+ this.graphTaskManager = graphTaskManager;
+ this.workerAggregatorUsage = workerAggregatorUsage;
+ this.workerContext = workerContext;
+ }
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return graphState.getSuperstep();
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getTotalNumVertices() {
+ return graphState.getTotalNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getTotalNumEdges() {
+ return graphState.getTotalNumEdges();
+ }
+
+ /**
+ * Send a message to a vertex id.
+ *
+ * @param id Vertex id to send the message to
+ * @param message Message data to send
+ */
+ public void sendMessage(I id, M2 message) {
+ if (workerClientRequestProcessor.sendMessageRequest(id, message)) {
+ graphTaskManager.notifySentMessages();
+ }
+ }
+
+ /**
+ * Send a message to all edges.
+ *
+ * @param vertex Vertex whose edges to send the message to.
+ * @param message Message sent to all edges.
+ */
+ public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+ for (Edge<I, E> edge : vertex.getEdges()) {
+ sendMessage(edge.getTargetVertexId(), message);
+ }
+ }
+
+ /**
+ * Sends a request to create a vertex that will be available during the
+ * next superstep.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ * @param edges Initial edges
+ */
+ public void addVertexRequest(I id, V value,
+ OutEdges<I, E> edges) throws IOException {
+ Vertex<I, V, E> vertex = getConf().createVertex();
+ vertex.initialize(id, value, edges);
+ workerClientRequestProcessor.addVertexRequest(vertex);
+ }
+
+ /**
+ * Sends a request to create a vertex that will be available during the
+ * next superstep.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ */
+ public void addVertexRequest(I id, V value) throws IOException {
+ addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
+ }
+
+ /**
+ * Request to remove a vertex from the graph
+ * (applied just prior to the next superstep).
+ *
+ * @param vertexId Id of the vertex to be removed.
+ */
+ public void removeVertexRequest(I vertexId) throws IOException {
+ workerClientRequestProcessor.removeVertexRequest(vertexId);
+ }
+
+ /**
+ * Request to add an edge of a vertex in the graph
+ * (processed just prior to the next superstep)
+ *
+ * @param sourceVertexId Source vertex id of edge
+ * @param edge Edge to add
+ */
+ public void addEdgeRequest(I sourceVertexId,
+ Edge<I, E> edge) throws IOException {
+ workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
+ }
+
+ /**
+ * Request to remove all edges from a given source vertex to a given target
+ * vertex (processed just prior to the next superstep).
+ *
+ * @param sourceVertexId Source vertex id
+ * @param targetVertexId Target vertex id
+ */
+ public void removeEdgesRequest(I sourceVertexId,
+ I targetVertexId) throws IOException {
+ workerClientRequestProcessor.removeEdgesRequest(
+ sourceVertexId, targetVertexId);
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return graphState.getContext();
+ }
+
+ /**
+ * Get the worker context
+ *
+ * @param <W> WorkerContext class
+ * @return WorkerContext context
+ */
+ @SuppressWarnings("unchecked")
+ public <W extends WorkerContext> W getWorkerContext() {
+ return (W) workerContext;
+ }
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ workerAggregatorUsage.aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return workerAggregatorUsage.<A>getAggregatedValue(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 0fc5fdf..6fdcfb0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -28,7 +28,6 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.TimerDesc;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
@@ -65,10 +64,11 @@ import java.util.concurrent.Callable;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
+ * @param <M1> Incoming message type
+ * @param <M2> Outgoing message type
*/
public class ComputeCallable<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
+ E extends Writable, M1 extends Writable, M2 extends Writable>
implements Callable<Collection<PartitionStats>> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
@@ -76,21 +76,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
private static final Time TIME = SystemTime.get();
/** Context */
private final Mapper<?, ?, ?, ?>.Context context;
- /** Graph state (note that it is recreated in call() for locality) */
- private GraphState<I, V, E, M> graphState;
+ /** Graph state */
+ private final GraphState graphState;
/** Thread-safe queue of all partition ids */
private final BlockingQueue<Integer> partitionIdQueue;
/** Message store */
- private final MessageStoreByPartition<I, M> messageStore;
+ private final MessageStoreByPartition<I, M1> messageStore;
/** Configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Worker (for NettyWorkerClientRequestProcessor) */
- private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ private final CentralizedServiceWorker<I, V, E> serviceWorker;
/** Dump some progress every 30 seconds */
private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
- /** Sends the messages (unique per Callable) */
- private WorkerClientRequestProcessor<I, V, E, M>
- workerClientRequestProcessor;
/** VertexWriter for this ComputeCallable */
private SimpleVertexWriter<I, V, E> vertexWriter;
/** Get the start time in nanos */
@@ -113,17 +110,16 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
* @param serviceWorker Service worker
*/
public ComputeCallable(
- Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
- MessageStoreByPartition<I, M> messageStore,
+ Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
+ MessageStoreByPartition<I, M1> messageStore,
BlockingQueue<Integer> partitionIdQueue,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ CentralizedServiceWorker<I, V, E> serviceWorker) {
this.context = context;
this.configuration = configuration;
this.partitionIdQueue = partitionIdQueue;
this.messageStore = messageStore;
this.serviceWorker = serviceWorker;
- // Will be replaced later in call() for locality
this.graphState = graphState;
SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
@@ -136,16 +132,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
@Override
public Collection<PartitionStats> call() {
// Thread initialization (for locality)
- this.workerClientRequestProcessor =
- new NettyWorkerClientRequestProcessor<I, V, E, M>(
+ WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+ new NettyWorkerClientRequestProcessor<I, V, E>(
context, configuration, serviceWorker);
WorkerThreadAggregatorUsage aggregatorUsage =
serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
-
- this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
- graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
- context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
- aggregatorUsage);
+ WorkerContext workerContext = serviceWorker.getWorkerContext();
vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
@@ -156,10 +148,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
break;
}
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
serviceWorker.getPartitionStore().getPartition(partitionId);
+
+ Computation<I, V, E, M1, M2> computation =
+ (Computation<I, V, E, M1, M2>) configuration.createComputation();
+ computation.initialize(graphState, workerClientRequestProcessor,
+ serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext);
+ computation.preSuperstep();
+
try {
- PartitionStats partitionStats = computePartition(partition);
+ PartitionStats partitionStats =
+ computePartition(computation, partition);
partitionStatsList.add(partitionStats);
long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
partitionStats.addMessagesSentCount(partitionMsgs);
@@ -177,6 +177,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} finally {
serviceWorker.getPartitionStore().putPartition(partition);
}
+
+ computation.postSuperstep();
}
// Return VertexWriter after the usage
@@ -201,29 +203,19 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
/**
* Compute a single partition
*
+ * @param computation Computation to use
* @param partition Partition to compute
* @return Partition stats for this computed partition
*/
- private PartitionStats computePartition(Partition<I, V, E, M> partition)
- throws IOException, InterruptedException {
+ private PartitionStats computePartition(
+ Computation<I, V, E, M1, M2> computation,
+ Partition<I, V, E> partition) throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0);
// Make sure this is thread-safe across runs
synchronized (partition) {
- // Prepare Partition context
- WorkerContext workerContext =
- graphState.getGraphTaskManager().getWorkerContext();
- PartitionContext partitionContext = partition.getPartitionContext();
- synchronized (workerContext) {
- partitionContext.preSuperstep(workerContext);
- }
- graphState.setPartitionContext(partition.getPartitionContext());
-
- for (Vertex<I, V, E, M> vertex : partition) {
- // Make sure every vertex has this thread's
- // graphState before computing
- vertex.setGraphState(graphState);
- Iterable<M> messages = messageStore.getVertexMessages(vertex.getId());
+ for (Vertex<I, V, E> vertex : partition) {
+ Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId());
if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
vertex.wakeUp();
}
@@ -231,7 +223,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
context.progress();
TimerContext computeOneTimerContext = computeOneTimer.time();
try {
- vertex.compute(messages);
+ computation.compute(vertex, messages);
} finally {
computeOneTimerContext.stop();
}
@@ -254,10 +246,6 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
}
messageStore.clearPartition(partition.getId());
-
- synchronized (workerContext) {
- partitionContext.postSuperstep(workerContext);
- }
}
return partitionStats;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
index 52df38d..748c3a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
@@ -18,7 +18,7 @@
package org.apache.giraph.graph;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
@@ -32,29 +32,24 @@ import org.apache.log4j.Logger;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class DefaultVertexResolver<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements VertexResolver<I, V, E, M>,
- ImmutableClassesGiraphConfigurable<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements VertexResolver<I, V, E> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(
DefaultVertexResolver.class);
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf = null;
- /** Stored graph state */
- private GraphState<I, V, E, M> graphState;
/** Whether to create vertices when they receive a message */
private boolean createVertexesOnMessages = true;
@Override
- public Vertex<I, V, E, M> resolve(
+ public Vertex<I, V, E> resolve(
I vertexId,
- Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges,
+ Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges,
boolean hasMessages) {
// This is the default vertex resolution algorithm
@@ -80,8 +75,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param vertex Vertex to remove edges from
* @param vertexChanges contains list of edges to remove.
*/
- protected void removeEdges(Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges) {
+ protected void removeEdges(Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges) {
if (vertex == null) {
return;
}
@@ -101,9 +96,9 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param vertexChanges specifies if we should remove vertex
* @return null if vertex should be removed, otherwise the vertex itself.
*/
- protected Vertex<I, V, E, M> removeVertexIfDesired(
- Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges) {
+ protected Vertex<I, V, E> removeVertexIfDesired(
+ Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges) {
if (hasVertexRemovals(vertexChanges)) {
vertex = null;
}
@@ -120,18 +115,17 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param hasMessages true if this vertex received any messages
* @return Vertex created or passed in, or null if no vertex should be added
*/
- protected Vertex<I, V, E, M> addVertexIfDesired(
+ protected Vertex<I, V, E> addVertexIfDesired(
I vertexId,
- Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges,
+ Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges,
boolean hasMessages) {
if (vertex == null) {
if (hasVertexAdditions(vertexChanges)) {
vertex = vertexChanges.getAddedVertexList().get(0);
} else if ((hasMessages && createVertexesOnMessages) ||
hasEdgeAdditions(vertexChanges)) {
- vertex = conf.createVertex();
- vertex.setGraphState(graphState);
+ vertex = getConf().createVertex();
vertex.initialize(vertexId, getConf().createVertexValue());
}
} else if (hasVertexAdditions(vertexChanges)) {
@@ -148,8 +142,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param vertex Vertex to add edges to
* @param vertexChanges contains edges to add
*/
- protected void addEdges(Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges) {
+ protected void addEdges(Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges) {
if (vertex == null) {
return;
}
@@ -166,7 +160,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param changes VertexChanges to check
* @return true if changes contains vertex removal requests
*/
- protected boolean hasVertexRemovals(VertexChanges<I, V, E, M> changes) {
+ protected boolean hasVertexRemovals(VertexChanges<I, V, E> changes) {
return changes != null && changes.getRemovedVertexCount() > 0;
}
@@ -176,7 +170,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param changes VertexChanges to check
* @return true if changes contains vertex addition requests
*/
- protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
+ protected boolean hasVertexAdditions(VertexChanges<I, V, E> changes) {
return changes != null && !changes.getAddedVertexList().isEmpty();
}
@@ -186,7 +180,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param changes VertexChanges to check
* @return true if changes contains edge addition requests
*/
- protected boolean hasEdgeAdditions(VertexChanges<I, V, E, M> changes) {
+ protected boolean hasEdgeAdditions(VertexChanges<I, V, E> changes) {
return changes != null && !changes.getAddedEdgeList().isEmpty();
}
@@ -196,28 +190,13 @@ public class DefaultVertexResolver<I extends WritableComparable,
* @param changes VertexChanges to check
* @return true if changes contains edge removal requests
*/
- protected boolean hasEdgeRemovals(VertexChanges<I, V, E, M> changes) {
+ protected boolean hasEdgeRemovals(VertexChanges<I, V, E> changes) {
return changes != null && !changes.getRemovedEdgeList().isEmpty();
}
@Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
- this.conf = conf;
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ super.setConf(conf);
createVertexesOnMessages = conf.getResolverCreateVertexOnMessages();
}
-
- @Override
- public GraphState<I, V, E, M> getGraphState() {
- return graphState;
- }
-
- @Override
- public void setGraphState(GraphState<I, V, E, M> graphState) {
- this.graphState = graphState;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
index 47902d1..adbe9d3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
@@ -35,7 +35,7 @@ public class DefaultVertexValueFactory<V extends Writable>
@Override
public void initialize(
- ImmutableClassesGiraphConfiguration<?, V, ?, ?> configuration) {
+ ImmutableClassesGiraphConfiguration<?, V, ?> configuration) {
vertexValueClass = configuration.getVertexValueClass();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
index 3c2286d..19034ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
@@ -155,12 +155,11 @@ public class GiraphTransferRegulator {
* @param <I> the vertex id type.
* @param <V> the vertex value type.
* @param <E> the edge value type.
- * @param <M> the message value type.
*/
public <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> void
+ E extends Writable> void
incrementCounters(PartitionOwner partitionOwner,
- Vertex<I, V, E, M> vertex) {
+ Vertex<I, V, E> vertex) {
final int id = partitionOwner.getPartitionId();
// vertex counts
vertexAccumulator
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index 4181d24..c86a024 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -36,16 +36,15 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class GraphMapper<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> extends
+ E extends Writable> extends
Mapper<Object, Object, Object, Object> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(GraphMapper.class);
/** Manage the framework-agnostic Giraph tasks for this job run */
- private GraphTaskManager<I, V, E, M> graphTaskManager;
+ private GraphTaskManager<I, V, E> graphTaskManager;
@Override
public void setup(Context context)
@@ -56,7 +55,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
// Execute all Giraph-related role(s) assigned to this compute node.
// Roles can include "master," "worker," "zookeeper," or . . . ?
- graphTaskManager = new GraphTaskManager<I, V, E, M>(context);
+ graphTaskManager = new GraphTaskManager<I, V, E>(context);
graphTaskManager.setup(
DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
index 93ad5df..ca57008 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
@@ -15,26 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.giraph.graph;
-import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Immutable global state of the graph.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public class GraphState<I extends WritableComparable, V extends Writable,
-E extends Writable, M extends Writable> {
+public class GraphState {
/** Graph-wide superstep */
private final long superstep;
/** Graph-wide number of vertices */
@@ -43,15 +32,6 @@ E extends Writable, M extends Writable> {
private final long numEdges;
/** Graph-wide map context */
private final Mapper<?, ?, ?, ?>.Context context;
- /** Graph-wide BSP Mapper for this Vertex */
- private final GraphTaskManager<I, V, E, M> graphTaskManager;
- /** Handles requests */
- private final WorkerClientRequestProcessor<I, V, E, M>
- workerClientRequestProcessor;
- /** Worker aggregator usage */
- private final WorkerAggregatorUsage workerAggregatorUsage;
- /** Partition context */
- private PartitionContext partitionContext;
/**
* Constructor
@@ -60,24 +40,14 @@ E extends Writable, M extends Writable> {
* @param numVertices Current graph-wide vertices
* @param numEdges Current graph-wide edges
* @param context Context
- * @param graphTaskManager GraphTaskManager for this compute node
- * @param workerClientRequestProcessor Handles all communication
- * @param workerAggregatorUsage Aggregator usage
*
*/
- public GraphState(
- long superstep, long numVertices,
- long numEdges, Mapper<?, ?, ?, ?>.Context context,
- GraphTaskManager<I, V, E, M> graphTaskManager,
- WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor,
- WorkerAggregatorUsage workerAggregatorUsage) {
+ public GraphState(long superstep, long numVertices, long numEdges,
+ Mapper<?, ?, ?, ?>.Context context) {
this.superstep = superstep;
this.numVertices = numVertices;
this.numEdges = numEdges;
this.context = context;
- this.graphTaskManager = graphTaskManager;
- this.workerClientRequestProcessor = workerClientRequestProcessor;
- this.workerAggregatorUsage = workerAggregatorUsage;
}
public long getSuperstep() {
@@ -96,33 +66,9 @@ E extends Writable, M extends Writable> {
return context;
}
- public GraphTaskManager<I, V, E, M> getGraphTaskManager() {
- return graphTaskManager;
- }
-
- public WorkerClientRequestProcessor<I, V, E, M>
- getWorkerClientRequestProcessor() {
- return workerClientRequestProcessor;
- }
-
- public WorkerAggregatorUsage getWorkerAggregatorUsage() {
- return workerAggregatorUsage;
- }
-
- public void setPartitionContext(PartitionContext partitionContext) {
- this.partitionContext = partitionContext;
- }
-
- public PartitionContext getPartitionContext() {
- return partitionContext;
- }
-
@Override
public String toString() {
return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +
- "numEdges=" + numEdges + ",context=" + context +
- ",graphMapper=" + graphTaskManager +
- ",workerClientRequestProcessor=" + workerClientRequestProcessor + ")";
-
+ "numEdges=" + numEdges + ",context=" + context + ")";
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
deleted file mode 100644
index 76cef43..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface specifying that the class can be configured with a GraphState.
- *
- * @param <I> Vertex ID object
- * @param <V> Vertex Value object
- * @param <E> Edge object
- * @param <M> Message object
- */
-public interface GraphStateAware<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Set the graph state.
- *
- * @param graphState Graph state saved.
- */
- void setGraphState(GraphState<I, V, E, M> graphState);
-
- /**
- * Get the graph state stored.
- *
- * @return GraphState stored.
- */
- GraphState<I, V, E, M> getGraphState();
-}