You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/10/09 08:35:19 UTC
[3/4] Everything compiles. All tests should run. Next step is to add
a test for the vertex combiner. Should have fixed. Fixed one bug for byte
array partition. Fixed another bug for too small of a message buffer.
Rebased. Rebased. Passes tests. Need to
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index f97446f..3337621 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -19,13 +19,15 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.ComputationFactory;
import org.apache.giraph.factories.DefaultComputationFactory;
import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
@@ -92,11 +94,14 @@ public class GiraphClasses<I extends WritableComparable,
/** Aggregator writer class - cached for fast access */
protected Class<? extends AggregatorWriter> aggregatorWriterClass;
- /** Combiner class - cached for fast access */
- protected Class<? extends Combiner<I, ? extends Writable>> combinerClass;
+ /** Message combiner class - cached for fast access */
+ protected Class<? extends MessageCombiner<I, ? extends Writable>>
+ messageCombinerClass;
/** Vertex resolver class - cached for fast access */
protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
+ /** Vertex value combiner class - cached for fast access */
+ protected Class<? extends VertexValueCombiner<V>> vertexValueCombinerClass;
/** Worker context class - cached for fast access */
protected Class<? extends WorkerContext> workerContextClass;
/** Master compute class - cached for fast access */
@@ -131,6 +136,8 @@ public class GiraphClasses<I extends WritableComparable,
aggregatorWriterClass = TextAggregatorWriter.class;
vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
(Object) DefaultVertexResolver.class;
+ vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+ (Object) DefaultVertexValueCombiner.class;
workerContextClass = DefaultWorkerContext.class;
masterComputeClass = DefaultMasterCompute.class;
partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
@@ -176,10 +183,13 @@ public class GiraphClasses<I extends WritableComparable,
EDGE_OUTPUT_FORMAT_CLASS.get(conf);
aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
- combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
- VERTEX_COMBINER_CLASS.get(conf);
+ messageCombinerClass =
+ (Class<? extends MessageCombiner<I, ? extends Writable>>)
+ MESSAGE_COMBINER_CLASS.get(conf);
vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
VERTEX_RESOLVER_CLASS.get(conf);
+ vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+ VERTEX_VALUE_COMBINER_CLASS.get(conf);
workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
masterComputeClass = MASTER_COMPUTE_CLASS.get(conf);
partitionClass = (Class<? extends Partition<I, V, E>>)
@@ -390,21 +400,22 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Check if Combiner is set
+ * Check if MessageCombiner is set
*
- * @return true if Combiner is set
+ * @return true if MessageCombiner is set
*/
- public boolean hasCombinerClass() {
- return combinerClass != null;
+ public boolean hasMessageCombinerClass() {
+ return messageCombinerClass != null;
}
/**
- * Get Combiner used
+ * Get MessageCombiner used
*
- * @return Combiner
+ * @return MessageCombiner
*/
- public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
- return combinerClass;
+ public Class<? extends MessageCombiner<I, ? extends Writable>>
+ getMessageCombinerClass() {
+ return messageCombinerClass;
}
/**
@@ -426,6 +437,15 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Get VertexValueCombiner used
+ *
+ * @return VertexValueCombiner
+ */
+ public Class<? extends VertexValueCombiner<V>> getVertexValueCombinerClass() {
+ return vertexValueCombinerClass;
+ }
+
+ /**
* Check if WorkerContext is set
*
* @return true if WorkerContext is set
@@ -639,14 +659,14 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Set Combiner class used
+ * Set MessageCombiner class used
*
- * @param combinerClass Combiner class to set
+ * @param combinerClass MessageCombiner class to set
* @return this
*/
- public GiraphClasses setCombinerClass(
- Class<? extends Combiner<I, ? extends Writable>> combinerClass) {
- this.combinerClass = combinerClass;
+ public GiraphClasses setMessageCombiner(
+ Class<? extends MessageCombiner<I, ? extends Writable>> combinerClass) {
+ this.messageCombinerClass = combinerClass;
return this;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 15ff861..4dee396 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -19,13 +19,14 @@
package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReuseObjectsOutEdges;
import org.apache.giraph.factories.ComputationFactory;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.Computation;
-import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -96,7 +97,7 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Get the user's subclassed {@link org.apache.giraph.graph.Computation}
+ * Get the user's subclassed {@link Computation}
*
* @return User's computation class
*/
@@ -467,22 +468,22 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Get the vertex combiner class (optional)
+ * Get the message combiner class (optional)
*
- * @return vertexCombinerClass Determines how vertex messages are combined
+ * @return messageCombinerClass Determines how vertex messages are combined
*/
- public Class<? extends Combiner> getCombinerClass() {
- return VERTEX_COMBINER_CLASS.get(this);
+ public Class<? extends MessageCombiner> getMessageCombinerClass() {
+ return MESSAGE_COMBINER_CLASS.get(this);
}
/**
- * Set the vertex combiner class (optional)
+ * Set the message combiner class (optional)
*
- * @param vertexCombinerClass Determines how vertex messages are combined
+ * @param messageCombinerClass Determines how vertex messages are combined
*/
- public void setCombinerClass(
- Class<? extends Combiner> vertexCombinerClass) {
- VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
+ public void setMessageCombinerClass(
+ Class<? extends MessageCombiner> messageCombinerClass) {
+ MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
}
/**
@@ -525,6 +526,16 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Set the vertex value combiner class (optional)
+ *
+ * @param vertexValueCombinerClass Determines how vertices are combined
+ */
+ public final void setVertexValueCombinerClass(
+ Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
+ VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
+ }
+
+ /**
* Set the worker context class (optional)
*
* @param workerContextClass Determines what code is executed on a each
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4dadd29..89fce61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -19,7 +19,7 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.ComputationFactory;
@@ -34,8 +34,10 @@ import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.factories.VertexIdFactory;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
@@ -150,15 +152,20 @@ public interface GiraphConstants {
ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
ClassConfOption.create("giraph.worker.observers", null,
WorkerObserver.class, "Classes for Worker Observer - optional");
- /** Vertex combiner class - optional */
- ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
- ClassConfOption.create("giraph.combinerClass", null, Combiner.class,
- "Vertex combiner class - optional");
+ /** Message combiner class - optional */
+ ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
+ ClassConfOption.create("giraph.messageCombinerClass", null,
+ MessageCombiner.class, "Message combiner class - optional");
/** Vertex resolver class - optional */
ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
ClassConfOption.create("giraph.vertexResolverClass",
DefaultVertexResolver.class, VertexResolver.class,
"Vertex resolver class - optional");
+ /** Vertex value combiner class - optional */
+ ClassConfOption<VertexValueCombiner> VERTEX_VALUE_COMBINER_CLASS =
+ ClassConfOption.create("giraph.vertexValueCombinerClass",
+ DefaultVertexValueCombiner.class, VertexValueCombiner.class,
+ "Vertex value combiner class - optional");
/** Which language computation is implemented in */
EnumConfOption<Language> COMPUTATION_LANGUAGE =
@@ -588,6 +595,20 @@ public interface GiraphConstants {
"request size is M, and a worker has P partitions, than its " +
"initial partition buffer size will be (M / P) * (1 + A).");
+ /** Maximum size of vertices (in bytes) per peer before flush */
+ IntConfOption MAX_VERTEX_REQUEST_SIZE =
+ new IntConfOption("giraph.vertexRequestSize", 512 * ONE_KB,
+ "Maximum size of vertices (in bytes) per peer before flush");
+
+ /**
+ * Additional size (expressed as a ratio) of each per-partition buffer on
+ * top of the average size for vertices.
+ */
+ FloatConfOption ADDITIONAL_VERTEX_REQUEST_SIZE =
+ new FloatConfOption("giraph.additionalVertexRequestSize", 0.2f,
+ "Additional size (expressed as a ratio) of each per-partition " +
+ "buffer on top of the average size.");
+
/** Maximum size of edges (in bytes) per peer before flush */
IntConfOption MAX_EDGE_REQUEST_SIZE =
new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB,
@@ -595,7 +616,7 @@ public interface GiraphConstants {
/**
* Additional size (expressed as a ratio) of each per-partition buffer on
- * top of the average size.
+ * top of the average size for edges.
*/
FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f,
@@ -665,9 +686,9 @@ public interface GiraphConstants {
LongConfOption INPUT_SPLIT_MAX_VERTICES =
new LongConfOption("giraph.InputSplitMaxVertices", -1,
"To limit outlier vertex input splits from producing too many " +
- "vertices or to help with testing, the number of vertices loaded " +
- "from an input split can be limited. By default, everything is " +
- "loaded.");
+ "vertices or to help with testing, the number of vertices " +
+ "loaded from an input split can be limited. By default, " +
+ "everything is loaded.");
/**
* To limit outlier vertex input splits from producing too many vertices or
@@ -677,9 +698,9 @@ public interface GiraphConstants {
LongConfOption INPUT_SPLIT_MAX_EDGES =
new LongConfOption("giraph.InputSplitMaxEdges", -1,
"To limit outlier vertex input splits from producing too many " +
- "vertices or to help with testing, the number of edges loaded " +
- "from an input split can be limited. By default, everything is " +
- "loaded.");
+ "vertices or to help with testing, the number of edges loaded " +
+ "from an input split can be limited. By default, everything is " +
+ "loaded.");
/**
* To minimize network usage when reading input splits,
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 435dfa5..6bb6c00 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -19,7 +19,7 @@
package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.edge.OutEdges;
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.DefaultVertex;
import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
@@ -87,8 +88,7 @@ import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
*/
@SuppressWarnings("unchecked")
public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends GiraphConfiguration {
+ V extends Writable, E extends Writable> extends GiraphConfiguration {
/** Holder for all the classes */
private final GiraphClasses classes;
/** Value (IVEMM) Factories */
@@ -429,12 +429,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Get the user's subclassed {@link Combiner} class.
+ * Get the user's subclassed
+ * {@link org.apache.giraph.combiner.MessageCombiner} class.
*
* @return User's combiner class
*/
- public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
- return classes.getCombinerClass();
+ public Class<? extends MessageCombiner<I, ? extends Writable>>
+ getMessageCombinerClass() {
+ return classes.getMessageCombinerClass();
}
/**
@@ -444,8 +446,9 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
* @return Instantiated user combiner class
*/
@SuppressWarnings("rawtypes")
- public <M extends Writable> Combiner<I, M> createCombiner() {
- Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
+ public <M extends Writable> MessageCombiner<I, M> createMessageCombiner() {
+ Class<? extends MessageCombiner<I, M>> klass =
+ classes.getMessageCombinerClass();
return ReflectionUtils.newInstance(klass, this);
}
@@ -454,8 +457,29 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*
* @return True iff user set a combiner class
*/
- public boolean useCombiner() {
- return classes.hasCombinerClass();
+ public boolean useMessageCombiner() {
+ return classes.hasMessageCombinerClass();
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.graph.VertexValueCombiner} class.
+ *
+ * @return User's vertex value combiner class
+ */
+ public Class<? extends VertexValueCombiner<V>>
+ getVertexValueCombinerClass() {
+ return classes.getVertexValueCombinerClass();
+ }
+
+ /**
+ * Create a user vertex value combiner class
+ *
+ * @return Instantiated user vertex value combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public VertexValueCombiner<V> createVertexValueCombiner() {
+ return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
}
/**
@@ -979,7 +1003,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Update Computation and Combiner class used
+ * Update Computation and MessageCombiner class used
*
* @param superstepClasses SuperstepClasses
*/
@@ -999,6 +1023,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
(Class<? extends Writable>) classList[4];
classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
}
- classes.setCombinerClass(superstepClasses.getCombinerClass());
+ classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 23df689..1694d36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -181,7 +181,7 @@ public class EdgeStore<I extends WritableComparable,
Integer partitionId;
while ((partitionId = partitionIdQueue.poll()) != null) {
Partition<I, V, E> partition =
- service.getPartitionStore().getPartition(partitionId);
+ service.getPartitionStore().getOrCreatePartition(partitionId);
ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
transientEdges.remove(partitionId);
for (I vertexId : partitionEdges.keySet()) {
@@ -196,7 +196,15 @@ public class EdgeStore<I extends WritableComparable,
outEdges);
partition.putVertex(vertex);
} else {
- vertex.setEdges(outEdges);
+ // A vertex may exist with or without edges initially
+ // and optimize the case of no initial edges
+ if (vertex.getNumEdges() == 0) {
+ vertex.setEdges(outEdges);
+ } else {
+ for (Edge<I, E> edge : outEdges) {
+ vertex.addEdge(edge);
+ }
+ }
// Some Partition implementations (e.g. ByteArrayPartition)
// require us to put back the vertex after modifying it.
partition.saveVertex(vertex);
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 77d9f5e..1fe1d10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -153,7 +153,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
}
Partition<I, V, E> partition =
- serviceWorker.getPartitionStore().getPartition(partitionId);
+ serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
Computation<I, V, E, M1, M2> computation =
(Computation<I, V, E, M1, M2>) configuration.createComputation();
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
new file mode 100644
index 0000000..4dc6384
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The default vertex value combining approach is to simply keep the original
+ * value.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class DefaultVertexValueCombiner<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements VertexValueCombiner<V> {
+ @Override
+ public void combine(V originalVertexValue,
+ V vertexValue) {
+ // Keep the original value, do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
new file mode 100644
index 0000000..7891434
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * When vertex values with the same vertex id are loaded, this
+ * class specifies how to combine their vertex values. Edges loaded will
+ * be added to the EdgeStore.
+ *
+ * @param <V> Vertex data
+ */
+public interface VertexValueCombiner<V extends Writable> {
+ /**
+ * Combine a vertex with the original vertex
+ * by modifying originalVertex.
+ *
+ * @param originalVertexValue Combine the other vertex into this one
+ * @param vertexValue Combine into the originalVertex.
+ */
+ void combine(V originalVertexValue, V vertexValue);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
new file mode 100644
index 0000000..1af5b73
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex_id vertex_value neighbor1 neighbor2 ...
+ */
+public class IntIntNullTextVertexInputFormat
+ extends
+ TextVertexInputFormat<IntWritable, IntWritable, NullWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new IntIntNullVertexReader();
+ }
+
+ /**
+ * Vertex reader associated with
+ * {@link org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat}.
+ */
+ public class IntIntNullVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String[]> {
+ /** Cached vertex id for the current line */
+ private IntWritable id;
+ /** Cached vertex value for the current line */
+ private IntWritable value;
+
+ @Override
+ protected String[] preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ id = new IntWritable(Integer.parseInt(tokens[0]));
+ value = new IntWritable(Integer.parseInt(tokens[1]));
+ return tokens;
+ }
+
+ @Override
+ protected IntWritable getId(String[] tokens) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected IntWritable getValue(String[] tokens) throws IOException {
+ return value;
+ }
+
+ @Override
+ protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+ String[] tokens) throws IOException {
+ List<Edge<IntWritable, NullWritable>> edges =
+ Lists.newArrayListWithCapacity(tokens.length - 2);
+ for (int n = 2; n < tokens.length; n++) {
+ edges.add(EdgeFactory.create(
+ new IntWritable(Integer.parseInt(tokens[n]))));
+ }
+ return edges;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index 6a795a8..ac7f5b7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -85,16 +85,11 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
* Create the line record reader. Override this to use a different
* underlying record reader (useful for testing).
*
- * @param inputSplit
- * the split to read
- * @param context
- * the context passed to initialize
- * @return
- * the record reader to be used
- * @throws IOException
- * exception that can be thrown during creation
- * @throws InterruptedException
- * exception that can be thrown during creation
+ * @param inputSplit the split to read
+ * @param context the context passed to initialize
+ * @return the record reader to be used
+ * @throws IOException exception that can be thrown during creation
+ * @throws InterruptedException exception that can be thrown during creation
*/
protected RecordReader<LongWritable, Text>
createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
@@ -157,22 +152,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
/**
* Reads vertex id from the current line.
*
- * @param line
- * the current line
- * @return
- * the vertex id corresponding to the line
- * @throws IOException
- * exception that can be thrown while reading
+ * @param line the current line
+ * @return the vertex id corresponding to the line
+ * @throws IOException exception that can be thrown while reading
*/
protected abstract I getId(Text line) throws IOException;
/**
* Reads vertex value from the current line.
*
- * @param line
- * the current line
- * @return
- * the vertex value corresponding to the line
+ * @param line the current line
+ * @return the vertex value corresponding to the line
* @throws IOException
* exception that can be thrown while reading
*/
@@ -183,8 +173,7 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
* Abstract class to be implemented by the user to read a vertex value from
* each text line after preprocessing it.
*
- * @param <T>
- * The resulting type of preprocessing.
+ * @param <T> The resulting type of preprocessing.
*/
protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
extends TextVertexValueReader {
@@ -226,12 +215,9 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
* Preprocess the line so other methods can easily read necessary
* information for creating vertex.
*
- * @param line
- * the current line to be read
- * @return
- * the preprocessed object
- * @throws IOException
- * exception that can be thrown while reading
+ * @param line the current line to be read
+ * @return the preprocessed object
+ * @throws IOException exception that can be thrown while reading
*/
protected abstract T preprocessLine(Text line) throws IOException;
@@ -240,22 +226,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
*
* @param line
* the object obtained by preprocessing the line
- * @return
- * the vertex id
- * @throws IOException
- * exception that can be thrown while reading
+ * @return the vertex id
+ * @throws IOException exception that can be thrown while reading
*/
protected abstract I getId(T line) throws IOException;
/**
* Reads vertex value from the preprocessed line.
*
- * @param line
- * the object obtained by preprocessing the line
- * @return
- * the vertex value
- * @throws IOException
- * exception that can be thrown while reading
+ * @param line the object obtained by preprocessing the line
+ * @return the vertex value
+ * @throws IOException exception that can be thrown while reading
*/
protected abstract V getValue(T line) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 5b870c5..fcb5b87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,13 +18,14 @@
package org.apache.giraph.job;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.DefaultVertexValueFactory;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -73,6 +74,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
/** V param vertex value factory index in classList */
private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
+ /** V param vertex value combiner index in classList */
+ private static final int VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX = 0;
/**
* The Configuration object for use in the validation test.
@@ -138,7 +141,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
verifyEdgeInputFormatGenericTypes();
verifyVertexOutputFormatGenericTypes();
verifyVertexResolverGenericTypes();
- verifyVertexCombinerGenericTypes();
+ verifyVertexValueCombinerGenericTypes();
+ verifyMessageCombinerGenericTypes();
verifyVertexValueFactoryGenericTypes();
}
@@ -240,17 +244,35 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
}
}
- /** If there is a combiner type, verify its generic params match the job. */
- private void verifyVertexCombinerGenericTypes() {
- Class<? extends Combiner<I, M2>> vertexCombinerClass =
- conf.getCombinerClass();
- if (vertexCombinerClass != null) {
+ /**
+ * If there is a vertex value combiner type, verify its
+ * generic params match the job.
+ */
+ private void verifyVertexValueCombinerGenericTypes() {
+ Class<? extends VertexValueCombiner<V>> vertexValueCombiner =
+ conf.getVertexValueCombinerClass();
+ if (vertexValueCombiner != null) {
+ Class<?>[] classList =
+ getTypeArguments(VertexValueCombiner.class, vertexValueCombiner);
+ checkAssignable(classList, VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX,
+ vertexValueType(), VertexValueCombiner.class, "vertex value");
+ }
+ }
+
+ /**
+ * If there is a message combiner type, verify its
+ * generic params match the job.
+ */
+ private void verifyMessageCombinerGenericTypes() {
+ Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
+ conf.getMessageCombinerClass();
+ if (messageCombinerClass != null) {
Class<?>[] classList =
- getTypeArguments(Combiner.class, vertexCombinerClass);
- checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(), Combiner.class,
- "vertex index");
+ getTypeArguments(MessageCombiner.class, messageCombinerClass);
+ checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+ MessageCombiner.class, "vertex index");
checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
- outgoingMessageValueType(), Combiner.class, "message value");
+ outgoingMessageValueType(), MessageCombiner.class, "message value");
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
index 6b2eedf..c7e9eae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
@@ -17,7 +17,7 @@
*/
package org.apache.giraph.jython;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -281,8 +281,8 @@ public class JythonJob {
private final TypeHolder message_value = new TypeHolder();
/** Computation class */
private String computation_name;
- /** Combiner class */
- private Class<? extends Combiner> combiner;
+ /** MessageCombiner class */
+ private Class<? extends MessageCombiner> messageCombiner;
/** Java options */
private final List<String> java_options = Lists.newArrayList();
/** Giraph options */
@@ -342,12 +342,13 @@ public class JythonJob {
return giraph_options;
}
- public Class<? extends Combiner> getCombiner() {
- return combiner;
+ public Class<? extends MessageCombiner> getMessageCombiner() {
+ return messageCombiner;
}
- public void setCombiner(Class<? extends Combiner> combiner) {
- this.combiner = combiner;
+ public void setMessageCombiner(
+ Class<? extends MessageCombiner> messageCombiner) {
+ this.messageCombiner = messageCombiner;
}
public String getComputation_name() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index cf7356c..287fdb9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,9 +18,9 @@
package org.apache.giraph.master;
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.GraphState;
import org.apache.hadoop.io.Writable;
@@ -48,7 +48,10 @@ public abstract class MasterCompute
private MasterAggregatorUsage masterAggregatorUsage;
/** Graph state */
private GraphState graphState;
- /** Computation and Combiner class used, which can be switched by master */
+ /**
+ * Computation and MessageCombiner classes used, which can be
+ * switched by master
+ */
private SuperstepClasses superstepClasses;
/**
@@ -143,26 +146,27 @@ public abstract class MasterCompute
}
/**
- * Set Combiner class to be used
+ * Set MessageCombiner class to be used
*
- * @param combinerClass Combiner class
+ * @param combinerClass MessageCombiner class
*/
- public final void setCombiner(Class<? extends Combiner> combinerClass) {
- superstepClasses.setCombinerClass(combinerClass);
+ public final void setMessageCombiner(
+ Class<? extends MessageCombiner> combinerClass) {
+ superstepClasses.setMessageCombinerClass(combinerClass);
}
/**
- * Get Combiner class to be used
+ * Get MessageCombiner class to be used
*
- * @return Combiner class
+ * @return MessageCombiner class
*/
- public final Class<? extends Combiner> getCombiner() {
+ public final Class<? extends MessageCombiner> getMessageCombiner() {
// Might be called prior to classes being set, do not return NPE
if (superstepClasses == null) {
return null;
}
- return superstepClasses.getCombinerClass();
+ return superstepClasses.getMessageCombinerClass();
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 7a7df05..8344910 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -18,7 +18,7 @@
package org.apache.giraph.master;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.TypesHolder;
import org.apache.giraph.graph.Computation;
@@ -35,13 +35,13 @@ import java.lang.reflect.Modifier;
import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
/**
- * Holds Computation and Combiner class.
+ * Holds Computation and MessageCombiner class.
*/
public class SuperstepClasses implements Writable {
/** Computation class to be used in the following superstep */
private Class<? extends Computation> computationClass;
- /** Combiner class to be used in the following superstep */
- private Class<? extends Combiner> combinerClass;
+ /** MessageCombiner class to be used in the following superstep */
+ private Class<? extends MessageCombiner> messageCombinerClass;
/**
* Default constructor
@@ -56,27 +56,28 @@ public class SuperstepClasses implements Writable {
*/
@SuppressWarnings("unchecked")
public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
- this(conf.getComputationClass(), conf.getCombinerClass());
+ this(conf.getComputationClass(), conf.getMessageCombinerClass());
}
/**
* Constructor
*
* @param computationClass Computation class
- * @param combinerClass Combiner class
+ * @param messageCombinerClass MessageCombiner class
*/
public SuperstepClasses(Class<? extends Computation> computationClass,
- Class<? extends Combiner> combinerClass) {
+ Class<? extends MessageCombiner> messageCombinerClass) {
this.computationClass = computationClass;
- this.combinerClass = combinerClass;
+ this.messageCombinerClass =
+ messageCombinerClass;
}
public Class<? extends Computation> getComputationClass() {
return computationClass;
}
- public Class<? extends Combiner> getCombinerClass() {
- return combinerClass;
+ public Class<? extends MessageCombiner> getMessageCombinerClass() {
+ return messageCombinerClass;
}
public void setComputationClass(
@@ -84,13 +85,15 @@ public class SuperstepClasses implements Writable {
this.computationClass = computationClass;
}
- public void setCombinerClass(Class<? extends Combiner> combinerClass) {
- this.combinerClass = combinerClass;
+ public void setMessageCombinerClass(
+ Class<? extends MessageCombiner> messageCombinerClass) {
+ this.messageCombinerClass =
+ messageCombinerClass;
}
/**
- * Verify that types of current Computation and Combiner are valid. If types
- * don't match an {@link IllegalStateException} will be thrown.
+ * Verify that types of current Computation and MessageCombiner are valid.
+ * If types don't match an {@link IllegalStateException} will be thrown.
*
* @param conf Configuration to verify this with
* @param checkMatchingMesssageTypes Check that the incoming/outgoing
@@ -128,13 +131,13 @@ public class SuperstepClasses implements Writable {
throw new IllegalStateException("verifyTypesMatch: " +
"Message type can't be abstract class" + outgoingMessageType);
}
- if (combinerClass != null) {
+ if (messageCombinerClass != null) {
Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
- Combiner.class, combinerClass);
+ MessageCombiner.class, messageCombinerClass);
verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
- "Vertex id", combinerClass);
+ "Vertex id", messageCombinerClass);
verifyTypes(outgoingMessageType, combinerTypes[1],
- "Outgoing message", combinerClass);
+ "Outgoing message", messageCombinerClass);
}
}
@@ -160,13 +163,13 @@ public class SuperstepClasses implements Writable {
@Override
public void write(DataOutput output) throws IOException {
WritableUtils.writeClass(computationClass, output);
- WritableUtils.writeClass(combinerClass, output);
+ WritableUtils.writeClass(messageCombinerClass, output);
}
@Override
public void readFields(DataInput input) throws IOException {
computationClass = WritableUtils.readClass(input);
- combinerClass = WritableUtils.readClass(input);
+ messageCombinerClass = WritableUtils.readClass(input);
}
@Override
@@ -174,6 +177,7 @@ public class SuperstepClasses implements Writable {
String computationName = computationClass == null ? "_not_set_" :
computationClass.getName();
return "(computation=" + computationName + ",combiner=" +
- ((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
+ ((messageCombinerClass == null) ? "null" :
+ messageCombinerClass.getName()) + ")";
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
index f2b8552..ec8a7d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -19,6 +19,8 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.utils.VertexIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
@@ -44,11 +46,14 @@ public abstract class BasicPartition<I extends WritableComparable,
private int id;
/** Context used to report progress */
private Progressable progressable;
+ /** Vertex value combiner */
+ private VertexValueCombiner<V> vertexValueCombiner;
@Override
public void initialize(int partitionId, Progressable progressable) {
setId(partitionId);
setProgressable(progressable);
+ vertexValueCombiner = conf.createVertexValueCombiner();
}
@Override
@@ -84,6 +89,21 @@ public abstract class BasicPartition<I extends WritableComparable,
this.progressable = progressable;
}
+ public VertexValueCombiner<V> getVertexValueCombiner() {
+ return vertexValueCombiner;
+ }
+
+ @Override
+ public void addPartitionVertices(VertexIterator<I, V, E> vertexIterator) {
+ while (vertexIterator.hasNext()) {
+ vertexIterator.next();
+ // Release the vertex if it was put, otherwise reuse as an optimization
+ if (putOrCombine(vertexIterator.getVertex())) {
+ vertexIterator.releaseVertex();
+ }
+ }
+ }
+
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(id);
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 6eaa6d7..cef39cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -17,6 +17,9 @@
*/
package org.apache.giraph.partition;
+import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Ints;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.WritableUtils;
@@ -24,9 +27,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
-import com.google.common.collect.MapMaker;
-import com.google.common.primitives.Ints;
-
+import javax.annotation.concurrent.NotThreadSafe;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -43,6 +44,7 @@ import java.util.concurrent.ConcurrentMap;
* @param <V> Vertex value
* @param <E> Edge value
*/
+@NotThreadSafe
public class ByteArrayPartition<I extends WritableComparable,
V extends Writable, E extends Writable>
extends BasicPartition<I, V, E>
@@ -55,6 +57,8 @@ public class ByteArrayPartition<I extends WritableComparable,
private ConcurrentMap<I, byte[]> vertexMap;
/** Representative vertex */
private Vertex<I, V, E> representativeVertex;
+ /** Representative combiner vertex */
+ private Vertex<I, V, E> representativeCombinerVertex;
/** Use unsafe serialization */
private boolean useUnsafeSerialization;
@@ -73,6 +77,11 @@ public class ByteArrayPartition<I extends WritableComparable,
getConf().createVertexId(),
getConf().createVertexValue(),
getConf().createOutEdges());
+ representativeCombinerVertex = getConf().createVertex();
+ representativeCombinerVertex.initialize(
+ getConf().createVertexId(),
+ getConf().createVertexValue(),
+ getConf().createOutEdges());
useUnsafeSerialization = getConf().useUnsafeSerialization();
}
@@ -125,8 +134,57 @@ public class ByteArrayPartition<I extends WritableComparable,
(ByteArrayPartition<I, V, E>) partition;
for (Map.Entry<I, byte[]> entry :
byteArrayPartition.vertexMap.entrySet()) {
- vertexMap.put(entry.getKey(), entry.getValue());
+
+ byte[] oldVertexBytes =
+ vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
+ if (oldVertexBytes == null) {
+ continue;
+ }
+
+ // Note that vertex combining is going to be expensive compared to
+ // SimplePartition since here we have to deserialize the vertices,
+ // combine them, and then reserialize them. If the vertex doesn't exist,
+ // just add the new vertex as a byte[]
+ synchronized (this) {
+ // Combine the vertex values
+ WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+ representativeVertex, useUnsafeSerialization, getConf());
+ WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
+ representativeCombinerVertex, useUnsafeSerialization, getConf());
+ getVertexValueCombiner().combine(representativeVertex.getValue(),
+ representativeCombinerVertex.getValue());
+
+ // Add the edges to the representative vertex
+ for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
+ representativeVertex.addEdge(edge);
+ }
+
+ byte[] vertexData = WritableUtils.writeVertexToByteArray(
+ representativeCombinerVertex, useUnsafeSerialization, getConf());
+ vertexMap.put(entry.getKey(), vertexData);
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
+ // Optimistically try to first put and then combine if this fails
+ byte[] vertexData =
+ WritableUtils.writeVertexToByteArray(
+ vertex, useUnsafeSerialization, getConf());
+ byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
+ if (oldVertexBytes == null) {
+ return true;
}
+
+ WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+ representativeVertex, useUnsafeSerialization, getConf());
+ getVertexValueCombiner().combine(representativeVertex.getValue(),
+ vertex.getValue());
+ vertexMap.put(vertex.getId(),
+ WritableUtils.writeVertexToByteArray(
+ representativeVertex, useUnsafeSerialization, getConf()));
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 110ce9d..c37efd5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -222,15 +222,28 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- public Partition<I, V, E> getPartition(Integer id) {
+ public Partition<I, V, E> getOrCreatePartition(Integer id) {
try {
- return pool.submit(new GetPartition(id)).get();
+ wLock.lock();
+ Partition<I, V, E> partition =
+ pool.submit(new GetPartition(id)).get();
+ if (partition == null) {
+ Partition<I, V, E> newPartition =
+ conf.createPartition(id, context);
+ pool.submit(
+ new AddPartition(id, newPartition)).get();
+ return newPartition;
+ } else {
+ return partition;
+ }
} catch (InterruptedException e) {
throw new IllegalStateException(
- "getPartition: cannot retrieve partition " + id, e);
+ "getOrCreatePartition: cannot retrieve partition " + id, e);
} catch (ExecutionException e) {
throw new IllegalStateException(
- "getPartition: cannot retrieve partition " + id, e);
+ "getOrCreatePartition: cannot retrieve partition " + id, e);
+ } finally {
+ wLock.unlock();
}
}
@@ -263,7 +276,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
@Override
public Partition<I, V, E> removePartition(Integer id) {
- Partition<I, V, E> partition = getPartition(id);
+ Partition<I, V, E> partition = getOrCreatePartition(id);
// we put it back, so the partition can turn INACTIVE and be deleted.
putPartition(partition);
deletePartition(id);
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index b6b9551..479abcc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -20,6 +20,7 @@ package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.VertexIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
@@ -70,13 +71,32 @@ public interface Partition<I extends WritableComparable,
Vertex<I, V, E> removeVertex(I vertexIndex);
/**
- * Add a partition's vertices
+ * Add a partition's vertices. If a vertex to be added doesn't exist,
+ * add it. If the vertex already exists, use the
+ * VertexValueCombiner to combine them.
*
* @param partition Partition to add
*/
void addPartition(Partition<I, V, E> partition);
/**
+ * Put this vertex or combine it
+ *
+ * @param vertex Vertex to put or combine
+ * @return True if the vertex was put (hint to release object)
+ */
+ boolean putOrCombine(Vertex<I, V, E> vertex);
+
+ /**
+ * Add vertices to a partition. If a vertex to be added doesn't exist,
+ * add it. If the vertex already exists, use the
+ * VertexValueCombiner to combine them.
+ *
+ * @param vertexIterator Vertices to add
+ */
+ void addPartitionVertices(VertexIterator<I, V, E> vertexIterator);
+
+ /**
* Get the number of vertices in this partition
*
* @return Number of vertices
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 763397e..fdc20a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableComparable;
*/
public abstract class PartitionStore<I extends WritableComparable,
V extends Writable, E extends Writable> {
-
/**
* Add a new partition to the store or just the vertices from the partition
* to the old partition.
@@ -40,17 +39,18 @@ public abstract class PartitionStore<I extends WritableComparable,
public abstract void addPartition(Partition<I, V, E> partition);
/**
- * Get a partition. Note: user has to put back it to the store through
- * {@link #putPartition(Partition)} after use.
+ * Get or create a partition. Note: user has to put back
+ * it to the store through {@link #putPartition(Partition)} after use.
*
* @param partitionId Partition id
- * @return The requested partition
+ * @return The requested partition (never null)
*/
- public abstract Partition<I, V, E> getPartition(Integer partitionId);
+ public abstract Partition<I, V, E> getOrCreatePartition(Integer partitionId);
/**
* Put a partition back to the store. Use this method to be put a partition
- * back after it has been retrieved through {@link #getPartition(Integer)}.
+ * back after it has been retrieved through
+ * {@link #getOrCreatePartition(Integer)}.
*
* @param partition Partition
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 0c1b404..1609846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,14 +18,15 @@
package org.apache.giraph.partition;
+import com.google.common.collect.Maps;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
-import com.google.common.collect.Maps;
-
+import javax.annotation.concurrent.ThreadSafe;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -43,6 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
* @param <V> Vertex data
* @param <E> Edge data
*/
+@ThreadSafe
@SuppressWarnings("rawtypes")
public class SimplePartition<I extends WritableComparable,
V extends Writable, E extends Writable>
@@ -81,9 +83,34 @@ public class SimplePartition<I extends WritableComparable,
}
@Override
+ public boolean putOrCombine(Vertex<I, V, E> vertex) {
+ Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
+ if (originalVertex == null) {
+ originalVertex =
+ vertexMap.putIfAbsent(vertex.getId(), vertex);
+ if (originalVertex == null) {
+ return true;
+ }
+ }
+
+ synchronized (originalVertex) {
+ // Combine the vertex values
+ getVertexValueCombiner().combine(
+ originalVertex.getValue(), vertex.getValue());
+
+ // Add the edges to the representative vertex
+ for (Edge<I, E> edge : vertex.getEdges()) {
+ originalVertex.addEdge(edge);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
public void addPartition(Partition<I, V, E> partition) {
for (Vertex<I, V, E> vertex : partition) {
- vertexMap.put(vertex.getId(), vertex);
+ putOrCombine(vertex);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index ae17aac..79c18c3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -18,13 +18,12 @@
package org.apache.giraph.partition;
+import com.google.common.collect.Maps;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import com.google.common.collect.Maps;
-
import java.util.concurrent.ConcurrentMap;
/**
@@ -67,12 +66,22 @@ public class SimplePartitionStore<I extends WritableComparable,
return;
}
}
+ // This is thread-safe
oldPartition.addPartition(partition);
}
@Override
- public Partition<I, V, E> getPartition(Integer partitionId) {
- return partitions.get(partitionId);
+ public Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
+ Partition<I, V, E> oldPartition = partitions.get(partitionId);
+ if (oldPartition == null) {
+ Partition<I, V, E> newPartition =
+ conf.createPartition(partitionId, context);
+ oldPartition = partitions.putIfAbsent(partitionId, newPartition);
+ if (oldPartition == null) {
+ return newPartition;
+ }
+ }
+ return oldPartition;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 7e2b73b..4958ae3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -55,7 +55,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
* deserializd right away, so this won't help.
*/
private void setUseMessageSizeEncoding() {
- if (!getConf().useCombiner()) {
+ if (!getConf().useMessageCombiner()) {
useMessageSizeEncoding = getConf().useMessageSizeEncoding();
} else {
useMessageSizeEncoding = false;
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 4bc4f4d..e441f03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.giraph.utils;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -25,7 +27,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.giraph.Algorithm;
import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.GiraphTypes;
@@ -35,6 +37,7 @@ import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -54,9 +57,6 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
import java.io.IOException;
import java.util.List;
@@ -108,7 +108,7 @@ public final class ConfigurationUtils {
"for the vertex output");
OPTIONS.addOption("esd", "edgeSubDir", true, "subdirectory to be used " +
"for the edge output");
- OPTIONS.addOption("c", "combiner", true, "Combiner class");
+ OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
@@ -277,8 +277,14 @@ public final class ConfigurationUtils {
TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
}
if (cmd.hasOption("c")) {
- conf.setCombinerClass(
- (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
+ conf.setMessageCombinerClass(
+ (Class<? extends MessageCombiner>)
+ Class.forName(cmd.getOptionValue("c")));
+ }
+ if (cmd.hasOption("vc")) {
+ conf.setVertexValueCombinerClass(
+ (Class<? extends VertexValueCombiner>)
+ Class.forName(cmd.getOptionValue("vc")));
}
if (cmd.hasOption("ve")) {
conf.setOutEdgesClass(
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
new file mode 100644
index 0000000..dced9bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Iterates over vertices stored in an ExtendedDataOutput such that
+ * the ownership of the vertex id can be transferred to another object.
+ * This optimization cuts down on the number of objects instantiated and
+ * garbage collected
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class VertexIterator<I extends WritableComparable,
+ V extends Writable, E extends Writable> {
+ /** Reader of the serialized edges */
+ private final ExtendedDataInput extendedDataInput;
+ /** Current vertex */
+ private Vertex<I, V, E> vertex;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+
+ /**
+ * Constructor.
+ *
+ * @param extendedDataOutput Extended data output
+ * @param configuration Configuration
+ */
+ public VertexIterator(
+ ExtendedDataOutput extendedDataOutput,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
+ extendedDataInput = configuration.createExtendedDataInput(
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ this.configuration = configuration;
+ resetEmptyVertex();
+ }
+
+ /**
+ * Reset the empty Vertex to an initial state.
+ */
+ private void resetEmptyVertex() {
+ vertex = configuration.createVertex();
+ I id = configuration.createVertexId();
+ V value = configuration.createVertexValue();
+ OutEdges<I, E> edges = configuration.createOutEdges();
+ vertex.initialize(id, value, edges);
+ }
+
+ /**
+ * Returns true if the iteration has more elements.
+ *
+ * @return True if the iteration has more elements.
+ */
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ public void next() {
+ // If the vertex was released, create another one
+ if (vertex == null) {
+ resetEmptyVertex();
+ }
+
+ // If the vertex id was released, create another one
+ if (vertex.getId() == null) {
+ vertex.initialize(configuration.createVertexId(), vertex.getValue());
+ }
+
+ try {
+ WritableUtils.reinitializeVertexFromDataInput(
+ extendedDataInput, vertex, configuration);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+
+ /**
+ * Get the current vertex id. Ihis object's contents are only guaranteed
+ * until next() is called. To take ownership of this object call
+ * releaseCurrentVertexId() after getting a reference to this object.
+ *
+ * @return Current vertex id
+ */
+ public I getCurrentVertexId() {
+ return vertex.getId();
+ }
+
+ /**
+ * The backing store of the current vertex id is now released.
+ * Further calls to getCurrentVertexId () without calling next()
+ * will return null.
+ *
+ * @return Current vertex id that was released
+ */
+ public I releaseCurrentVertexId() {
+ I releasedVertexId = vertex.getId();
+ vertex.initialize(null, vertex.getValue());
+ return releasedVertexId;
+ }
+
+ public Vertex<I, V, E> getVertex() {
+ return vertex;
+ }
+
+ /**
+ * Release the ownership of the Vertex object to the caller
+ *
+ * @return Released Vertex object
+ */
+ public Vertex<I, V, E> releaseVertex() {
+ Vertex<I, V, E> releasedVertex = vertex;
+ vertex = null;
+ return releasedVertex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 9163c08..3f8382e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -530,7 +530,7 @@ public class WritableUtils {
}
/**
- * Reads data from input stream to inizialize Vertex.
+ * Reads data from input stream to initialize Vertex.
*
* @param input The input stream
* @param conf Configuration
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 112b76d..a92ddf8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -571,7 +571,7 @@ public class BspServiceWorker<I extends WritableComparable,
new ArrayList<PartitionStats>();
for (Integer partitionId : getPartitionStore().getPartitionIds()) {
Partition<I, V, E> partition =
- getPartitionStore().getPartition(partitionId);
+ getPartitionStore().getOrCreatePartition(partitionId);
PartitionStats partitionStats =
new PartitionStats(partition.getId(),
partition.getVertexCount(),
@@ -974,7 +974,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
Partition<I, V, E> partition =
- getPartitionStore().getPartition(partitionId);
+ getPartitionStore().getOrCreatePartition(partitionId);
long verticesWritten = 0;
for (Vertex<I, V, E> vertex : partition) {
vertexWriter.writeVertex(vertex);
@@ -1082,7 +1082,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
Partition<I, V, E> partition =
- getPartitionStore().getPartition(partitionId);
+ getPartitionStore().getOrCreatePartition(partitionId);
long vertices = 0;
long edges = 0;
long partitionEdgeCount = partition.getEdgeCount();
@@ -1241,7 +1241,7 @@ public class BspServiceWorker<I extends WritableComparable,
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
for (Integer partitionId : getPartitionStore().getPartitionIds()) {
Partition<I, V, E> partition =
- getPartitionStore().getPartition(partitionId);
+ getPartitionStore().getOrCreatePartition(partitionId);
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
// write messages
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 115c108..fcdfa5c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -129,7 +129,7 @@ public class RequestTest {
assertTrue(partitionStore.hasPartition(partitionId));
int total = 0;
Partition<IntWritable, IntWritable, IntWritable> partition2 =
- partitionStore.getPartition(partitionId);
+ partitionStore.getOrCreatePartition(partitionId);
for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
total += vertex.getId().get();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index a8f6f70..1fe3a25 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -19,7 +19,7 @@
package org.apache.giraph.comm.messages;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.combiner.FloatSumMessageCombiner;
import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestIntFloatPrimitiveMessageStores {
Lists.newArrayList(0, 1));
Partition partition = Mockito.mock(Partition.class);
Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
- Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
- Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+ Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+ Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
}
private static class IntFloatNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestIntFloatPrimitiveMessageStores {
@Test
public void testIntFloatMessageStore() throws IOException {
IntFloatMessageStore messageStore =
- new IntFloatMessageStore(service, new FloatSumCombiner());
+ new IntFloatMessageStore(service, new FloatSumMessageCombiner());
insertIntFloatMessages(messageStore);
Iterable<FloatWritable> m0 =
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index 0659260..a04b703 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -19,7 +19,7 @@
package org.apache.giraph.comm.messages;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestLongDoublePrimitiveMessageStores {
Lists.newArrayList(0, 1));
Partition partition = Mockito.mock(Partition.class);
Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
- Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
- Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+ Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+ Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
}
private static class LongDoubleNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestLongDoublePrimitiveMessageStores {
@Test
public void testLongDoubleMessageStore() throws IOException {
LongDoubleMessageStore messageStore =
- new LongDoubleMessageStore(service, new DoubleSumCombiner());
+ new LongDoubleMessageStore(service, new DoubleSumMessageCombiner());
insertLongDoubleMessages(messageStore);
Iterable<DoubleWritable> m0 =