You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/20 21:08:28 UTC
git commit: GIRAPH-547: Allow in-place modification of edges (apresta)
Updated Branches:
refs/heads/trunk 40bc599b8 -> 5bb956cad
GIRAPH-547: Allow in-place modification of edges (apresta)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5bb956ca
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5bb956ca
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5bb956ca
Branch: refs/heads/trunk
Commit: 5bb956cad2d9c4beded036553f715f0e3156d665
Parents: 40bc599
Author: Alessandro Presta <al...@fb.com>
Authored: Wed Mar 13 19:18:39 2013 -0700
Committer: Alessandro Presta <al...@fb.com>
Committed: Wed Mar 20 13:08:17 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../conf/ImmutableClassesGiraphConfiguration.java | 29 ++-
.../org/apache/giraph/edge/ArrayListEdges.java | 11 +-
.../org/apache/giraph/edge/ByteArrayEdges.java | 4 +-
.../java/org/apache/giraph/edge/DefaultEdge.java | 2 +-
.../java/org/apache/giraph/edge/EdgeFactory.java | 14 +-
.../java/org/apache/giraph/edge/EdgeNoValue.java | 2 +-
.../java/org/apache/giraph/edge/EdgeStore.java | 3 +-
.../java/org/apache/giraph/edge/HashMapEdges.java | 35 ++-
.../org/apache/giraph/edge/HashMultimapEdges.java | 4 +-
.../apache/giraph/edge/LongDoubleArrayEdges.java | 76 ++++++-
.../apache/giraph/edge/LongDoubleHashMapEdges.java | 80 ++++++-
.../org/apache/giraph/edge/LongNullArrayEdges.java | 44 +++--
.../apache/giraph/edge/LongNullHashSetEdges.java | 30 ++-
.../org/apache/giraph/edge/MapMutableEdge.java | 61 +++++
.../java/org/apache/giraph/edge/MutableEdge.java | 12 +-
.../apache/giraph/edge/MutableEdgesIterable.java | 102 ++++++++
.../apache/giraph/edge/MutableEdgesWrapper.java | 183 +++++++++++++++
.../org/apache/giraph/edge/MutableVertexEdges.java | 42 ++++
.../java/org/apache/giraph/edge/ReusableEdge.java | 43 ++++
.../giraph/edge/StrictRandomAccessVertexEdges.java | 9 +
.../org/apache/giraph/graph/ComputeCallable.java | 2 +
.../main/java/org/apache/giraph/graph/Vertex.java | 66 ++++++
.../giraph/utils/ByteArrayVertexIdEdges.java | 2 +-
.../org/apache/giraph/utils/PairListWritable.java | 76 ------
.../org/apache/giraph/edge/TestNullValueEdges.java | 46 ++++-
.../giraph/edge/TestStrictRandomAccessEdges.java | 3 +
.../apache/giraph/graph/TestVertexAndEdges.java | 169 +++++++++++++-
.../giraph/hive/input/edge/HiveEdgeReader.java | 10 +-
29 files changed, 1002 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 462f104..d1983d0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-547: Allow in-place modification of edges (apresta)
+
GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)
GIRAPH-480: Add convergence detection to org.apache.giraph.examples.RandomWalkVertex (ssc)
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 7075999..4fedc46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.edge.ReuseObjectsVertexEdges;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
@@ -40,7 +42,6 @@ import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.worker.WorkerContext;
@@ -508,15 +509,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Create a mutable user edge.
+ * Create a reusable edge.
*
- * @return Instantiated mutable user edge.
+ * @return Instantiated reusable edge.
*/
- public MutableEdge<I, E> createMutableEdge() {
+ public ReusableEdge<I, E> createReusableEdge() {
if (isEdgeValueNullWritable()) {
- return (MutableEdge<I, E>) EdgeFactory.createMutable(createVertexId());
+ return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
} else {
- return EdgeFactory.createMutable(createVertexId(), createEdgeValue());
+ return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
}
}
@@ -562,6 +563,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * True if the {@link VertexEdges} implementation copies the passed edges
+ * to its own data structure, i.e. it doesn't keep references to Edge
+ * objects, target vertex ids or edge values passed to add() or
+ * initialize().
+ * This makes it possible to reuse edge objects passed to the data
+ * structure, to minimize object instantiation (see for example
+ * EdgeStore#addPartitionEdges()).
+ *
+ * @return True iff we can reuse the edge objects
+ */
+ public boolean reuseEdgeObjects() {
+ return ReuseObjectsVertexEdges.class.isAssignableFrom(
+ getVertexEdgesClass());
+ }
+
+ /**
* Create a user {@link VertexEdges}
*
* @return Instantiated user VertexEdges
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
index 98b1aef..dda7568 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
@@ -37,7 +37,8 @@ import java.util.Iterator;
* @param <E> Edge value
*/
public class ArrayListEdges<I extends WritableComparable, E extends Writable>
- extends ConfigurableVertexEdges<I, E> {
+ extends ConfigurableVertexEdges<I, E>
+ implements MutableVertexEdges<I, E> {
/** List of edges. */
private ArrayList<Edge<I, E>> edgeList;
@@ -89,6 +90,14 @@ public class ArrayListEdges<I extends WritableComparable, E extends Writable>
}
@Override
+ @SuppressWarnings("unchecked")
+ public Iterator<MutableEdge<I, E>> mutableIterator() {
+ // The downcast is fine because all concrete Edge implementations are
+ // mutable, but we only expose the mutation functionality when appropriate.
+ return (Iterator) iterator();
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
out.writeInt(edgeList.size());
for (Edge<I, E> edge : edgeList) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
index 6201d25..16748ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
@@ -142,8 +142,8 @@ public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
getConf().createExtendedDataInput(
serializedEdges, 0, serializedEdgesBytesUsed);
/** Representative edge object. */
- private MutableEdge<I, E> representativeEdge =
- getConf().createMutableEdge();
+ private ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
@Override
public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
index 461bff3..d7cfb2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
*/
@SuppressWarnings("rawtypes")
public class DefaultEdge<I extends WritableComparable, E extends Writable>
- implements MutableEdge<I, E> {
+ implements ReusableEdge<I, E> {
/** Target vertex id */
private I targetVertexId = null;
/** Edge value */
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
index 3599207..c0d1a0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
@@ -41,7 +41,7 @@ public class EdgeFactory {
public static <I extends WritableComparable,
E extends Writable>
Edge<I, E> create(I id, E value) {
- return createMutable(id, value);
+ return createReusable(id, value);
}
/**
@@ -53,11 +53,11 @@ public class EdgeFactory {
*/
public static <I extends WritableComparable>
Edge<I, NullWritable> create(I id) {
- return createMutable(id);
+ return createReusable(id);
}
/**
- * Create a mutable edge pointing to a given ID with a value
+ * Create a reusable edge pointing to a given ID with a value
*
* @param id target ID
* @param value edge value
@@ -67,23 +67,23 @@ public class EdgeFactory {
*/
public static <I extends WritableComparable,
E extends Writable>
- MutableEdge<I, E> createMutable(I id, E value) {
+ ReusableEdge<I, E> createReusable(I id, E value) {
if (value instanceof NullWritable) {
- return (MutableEdge<I, E>) createMutable(id);
+ return (ReusableEdge<I, E>) createReusable(id);
} else {
return new DefaultEdge<I, E>(id, value);
}
}
/**
- * Create a mutable edge pointing to a given ID with a value
+ * Create a reusable edge pointing to a given ID with a value
*
* @param id target ID
* @param <I> Vertex ID type
* @return Edge pointing to ID with value
*/
public static <I extends WritableComparable>
- MutableEdge<I, NullWritable> createMutable(I id) {
+ ReusableEdge<I, NullWritable> createReusable(I id) {
return new EdgeNoValue<I>(id);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
index dd22aec..306a413 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex ID
*/
public class EdgeNoValue<I extends WritableComparable>
- implements MutableEdge<I, NullWritable> {
+ implements ReusableEdge<I, NullWritable> {
/** Target vertex id */
private I targetVertexId = null;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 1f6e9bb..234c267 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -75,8 +75,7 @@ public class EdgeStore<I extends WritableComparable,
this.progressable = progressable;
transientEdges = new MapMaker().concurrencyLevel(
configuration.getNettyServerExecutionConcurrency()).makeMap();
- reuseEdgeObjects = ReuseObjectsVertexEdges.class.isAssignableFrom(
- configuration.getVertexEdgesClass());
+ reuseEdgeObjects = configuration.reuseEdgeObjects();
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
index 2600992..9fa7b64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
@@ -19,7 +19,6 @@
package org.apache.giraph.edge;
import com.google.common.collect.Maps;
-import com.google.common.collect.UnmodifiableIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -42,7 +41,8 @@ import java.util.Map;
*/
public class HashMapEdges<I extends WritableComparable, E extends Writable>
extends ConfigurableVertexEdges<I, E>
- implements StrictRandomAccessVertexEdges<I, E> {
+ implements StrictRandomAccessVertexEdges<I, E>,
+ MutableVertexEdges<I, E> {
/** Map from target vertex id to edge value. */
private HashMap<I, E> edgeMap;
@@ -86,6 +86,13 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
}
@Override
+ public void setEdgeValue(I targetVertexId, E edgeValue) {
+ if (edgeMap.containsKey(targetVertexId)) {
+ edgeMap.put(targetVertexId, edgeValue);
+ }
+ }
+
+ @Override
public int size() {
return edgeMap.size();
}
@@ -93,13 +100,20 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
@Override
public Iterator<Edge<I, E>> iterator() {
// Returns an iterator that reuses objects.
- return new UnmodifiableIterator<Edge<I, E>>() {
+ // The downcast is fine because all concrete Edge implementations are
+ // mutable, but we only expose the mutation functionality when appropriate.
+ return (Iterator) mutableIterator();
+ }
+
+ @Override
+ public Iterator<MutableEdge<I, E>> mutableIterator() {
+ return new Iterator<MutableEdge<I, E>>() {
/** Wrapped map iterator. */
private Iterator<Map.Entry<I, E>> mapIterator =
edgeMap.entrySet().iterator();
/** Representative edge object. */
- private MutableEdge<I, E> representativeEdge =
- getConf().createMutableEdge();
+ private MapMutableEdge<I, E> representativeEdge =
+ new MapMutableEdge<I, E>();
@Override
public boolean hasNext() {
@@ -107,12 +121,15 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
}
@Override
- public Edge<I, E> next() {
- Map.Entry<I, E> nextEntry = mapIterator.next();
- representativeEdge.setTargetVertexId(nextEntry.getKey());
- representativeEdge.setValue(nextEntry.getValue());
+ public MutableEdge<I, E> next() {
+ representativeEdge.setEntry(mapIterator.next());
return representativeEdge;
}
+
+ @Override
+ public void remove() {
+ mapIterator.remove();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
index 143d7a4..123d49f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
@@ -111,8 +111,8 @@ public class HashMultimapEdges<I extends WritableComparable, E extends Writable>
private Iterator<Map.Entry<I, E>> mapIterator =
edgeMultimap.entries().iterator();
/** Representative edge object. */
- private MutableEdge<I, E> representativeEdge =
- getConf().createMutableEdge();
+ private ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
@Override
public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
index f164484..0487d92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
@@ -41,7 +41,8 @@ import java.util.Iterator;
*/
public class LongDoubleArrayEdges
extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
- implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+ implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable>,
+ MutableVertexEdges<LongWritable, DoubleWritable> {
/** Array of target vertex ids. */
private LongArrayList neighbors;
/** Array of edge values. */
@@ -97,7 +98,7 @@ public class LongDoubleArrayEdges
*
* @param i Position of edge to be removed
*/
- private void remove(int i) {
+ private void removeAt(int i) {
// The order of the edges is irrelevant, so we can simply replace
// the deleted edge with the rightmost element, thus achieving constant
// time.
@@ -108,18 +109,19 @@ public class LongDoubleArrayEdges
neighbors.set(i, neighbors.popLong());
edgeValues.set(i, edgeValues.popDouble());
}
+ // If needed after the removal, trim the arrays.
+ trim();
}
@Override
public void remove(LongWritable targetVertexId) {
- // Thanks to the constant-time implementation of remove(int),
+ // Thanks to the constant-time implementation of removeAt(int),
// we can remove all matching edges in linear time.
for (int i = neighbors.size() - 1; i >= 0; --i) {
if (neighbors.get(i) == targetVertexId.get()) {
- remove(i);
+ removeAt(i);
}
}
- trim();
}
@Override
@@ -153,6 +155,70 @@ public class LongDoubleArrayEdges
};
}
+ /** Helper class for a mutable edge that modifies the backing arrays. */
+ private class LongDoubleArrayMutableEdge
+ extends DefaultEdge<LongWritable, DoubleWritable> {
+ /** Index of the edge in the backing arrays. */
+ private int index;
+
+ /** Constructor. */
+ public LongDoubleArrayMutableEdge() {
+ super(new LongWritable(), new DoubleWritable());
+ }
+
+ /**
+ * Make the edge point to the given index in the backing arrays.
+ *
+ * @param index Index in the arrays
+ */
+ public void setIndex(int index) {
+ // Update the id and value objects from the superclass.
+ getTargetVertexId().set(neighbors.get(index));
+ getValue().set(edgeValues.get(index));
+ // Update the index.
+ this.index = index;
+ }
+
+ @Override
+ public void setValue(DoubleWritable value) {
+ // Update the value object from the superclass.
+ getValue().set(value.get());
+ // Update the value stored in the backing array.
+ edgeValues.set(index, value.get());
+ }
+ }
+
+ @Override
+ public Iterator<MutableEdge<LongWritable, DoubleWritable>>
+ mutableIterator() {
+ return new Iterator<MutableEdge<LongWritable, DoubleWritable>>() {
+ /** Current position in the array. */
+ private int offset = 0;
+ /** Representative edge object. */
+ private LongDoubleArrayMutableEdge representativeEdge =
+ new LongDoubleArrayMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return offset < neighbors.size();
+ }
+
+ @Override
+ public MutableEdge<LongWritable, DoubleWritable> next() {
+ representativeEdge.setIndex(offset++);
+ return representativeEdge;
+ }
+
+ @Override
+ public void remove() {
+ // Since removeAt() might replace the deleted edge with the last edge
+ // in the array, we need to decrease the offset so that the latter
+ // won't be skipped.
+ removeAt(--offset);
+ }
+ };
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(neighbors.size());
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
index 68bd85f..867a356 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
@@ -42,7 +42,8 @@ import java.util.Iterator;
public class LongDoubleHashMapEdges
extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
implements StrictRandomAccessVertexEdges<LongWritable, DoubleWritable>,
- ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+ ReuseObjectsVertexEdges<LongWritable, DoubleWritable>,
+ MutableVertexEdges<LongWritable, DoubleWritable> {
/** Hash map from target vertex id to edge value. */
private Long2DoubleOpenHashMap edgeMap;
/** Representative edge value object, used by getEdgeValue(). */
@@ -96,6 +97,14 @@ public class LongDoubleHashMapEdges
}
@Override
+ public void setEdgeValue(LongWritable targetVertexId,
+ DoubleWritable edgeValue) {
+ if (edgeMap.containsKey(targetVertexId.get())) {
+ edgeMap.put(targetVertexId.get(), edgeValue.get());
+ }
+ }
+
+ @Override
public int size() {
return edgeMap.size();
}
@@ -108,8 +117,8 @@ public class LongDoubleHashMapEdges
private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
edgeMap.long2DoubleEntrySet().fastIterator();
/** Representative edge object. */
- private MutableEdge<LongWritable, DoubleWritable> representativeEdge =
- getConf().createMutableEdge();
+ private ReusableEdge<LongWritable, DoubleWritable> representativeEdge =
+ getConf().createReusableEdge();
@Override
public boolean hasNext() {
@@ -126,6 +135,71 @@ public class LongDoubleHashMapEdges
};
}
+ /** Helper class for a mutable edge that modifies the backing map entry. */
+ private static class LongDoubleHashMapMutableEdge
+ extends DefaultEdge<LongWritable, DoubleWritable> {
+ /** Backing entry for the edge in the map. */
+ private Long2DoubleMap.Entry entry;
+
+ /** Constructor. */
+ public LongDoubleHashMapMutableEdge() {
+ super(new LongWritable(), new DoubleWritable());
+ }
+
+ /**
+ * Make the edge point to the given entry in the backing map.
+ *
+ * @param entry Backing entry
+ */
+ public void setEntry(Long2DoubleMap.Entry entry) {
+ // Update the id and value objects from the superclass.
+ getTargetVertexId().set(entry.getLongKey());
+ getValue().set(entry.getValue());
+ // Update the entry.
+ this.entry = entry;
+ }
+
+ @Override
+ public void setValue(DoubleWritable value) {
+ // Update the value object from the superclass.
+ getValue().set(value.get());
+ // Update the value stored in the backing map.
+ entry.setValue(value.get());
+ }
+ }
+
+ @Override
+ public Iterator<MutableEdge<LongWritable, DoubleWritable>> mutableIterator() {
+ return new Iterator<MutableEdge<LongWritable, DoubleWritable>>() {
+ /**
+ * Wrapped map iterator.
+ * Note: we cannot use the fast iterator in this case,
+ * because we need to call setValue() on an entry.
+ */
+ private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
+ edgeMap.long2DoubleEntrySet().iterator();
+ /** Representative edge object. */
+ private LongDoubleHashMapMutableEdge representativeEdge =
+ new LongDoubleHashMapMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public MutableEdge<LongWritable, DoubleWritable> next() {
+ representativeEdge.setEntry(mapIterator.next());
+ return representativeEdge;
+ }
+
+ @Override
+ public void remove() {
+ mapIterator.remove();
+ }
+ };
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(edgeMap.size());
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
index 528acb2..de4e310 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
@@ -18,7 +18,6 @@
package org.apache.giraph.edge;
-import com.google.common.collect.UnmodifiableIterator;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongIterator;
import org.apache.hadoop.io.LongWritable;
@@ -39,7 +38,8 @@ import java.util.Iterator;
*/
public class LongNullArrayEdges
extends ConfigurableVertexEdges<LongWritable, NullWritable>
- implements ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+ implements ReuseObjectsVertexEdges<LongWritable, NullWritable>,
+ MutableVertexEdges<LongWritable, NullWritable> {
/** Array of target vertex ids. */
private LongArrayList neighbors;
@@ -89,7 +89,7 @@ public class LongNullArrayEdges
*
* @param i Position of edge to be removed
*/
- private void remove(int i) {
+ private void removeAt(int i) {
// The order of the edges is irrelevant, so we can simply replace
// the deleted edge with the rightmost element, thus achieving constant
// time.
@@ -98,18 +98,19 @@ public class LongNullArrayEdges
} else {
neighbors.set(i, neighbors.popLong());
}
+ // If needed after the removal, trim the array.
+ trim();
}
@Override
public void remove(LongWritable targetVertexId) {
- // Thanks to the constant-time implementation of remove(int),
+ // Thanks to the constant-time implementation of removeAt(int),
// we can remove all matching edges in linear time.
for (int i = neighbors.size() - 1; i >= 0; --i) {
if (neighbors.get(i) == targetVertexId.get()) {
- remove(i);
+ removeAt(i);
}
}
- trim();
}
@Override
@@ -120,23 +121,38 @@ public class LongNullArrayEdges
@Override
public Iterator<Edge<LongWritable, NullWritable>> iterator() {
// Returns an iterator that reuses objects.
- return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
- /** Wrapped neighbors iterator. */
- private LongIterator neighborsIt = neighbors.iterator();
+ // The downcast is fine because all concrete Edge implementations are
+ // mutable, but we only expose the mutation functionality when appropriate.
+ return (Iterator) mutableIterator();
+ }
+
+ @Override
+ public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
+ return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
+ /** Current position in the array. */
+ private int offset = 0;
/** Representative edge object. */
- private Edge<LongWritable, NullWritable> representativeEdge =
- getConf().createEdge();
+ private MutableEdge<LongWritable, NullWritable> representativeEdge =
+ getConf().createReusableEdge();
@Override
public boolean hasNext() {
- return neighborsIt.hasNext();
+ return offset < neighbors.size();
}
@Override
- public Edge<LongWritable, NullWritable> next() {
- representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+ public MutableEdge<LongWritable, NullWritable> next() {
+ representativeEdge.getTargetVertexId().set(neighbors.get(offset++));
return representativeEdge;
}
+
+ @Override
+ public void remove() {
+ // Since removeAt() might replace the deleted edge with the last edge
+ // in the array, we need to decrease the offset so that the latter
+ // won't be skipped.
+ removeAt(--offset);
+ }
};
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
index 26c57ae..094d471 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
@@ -18,7 +18,6 @@
package org.apache.giraph.edge;
-import com.google.common.collect.UnmodifiableIterator;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.apache.hadoop.io.LongWritable;
@@ -40,8 +39,8 @@ import java.util.Iterator;
*/
public class LongNullHashSetEdges
extends ConfigurableVertexEdges<LongWritable, NullWritable>
- implements StrictRandomAccessVertexEdges<LongWritable, NullWritable>,
- ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+ implements ReuseObjectsVertexEdges<LongWritable, NullWritable>,
+ MutableVertexEdges<LongWritable, NullWritable> {
/** Hash set of target vertex ids. */
private LongOpenHashSet neighbors;
@@ -81,11 +80,6 @@ public class LongNullHashSetEdges
}
@Override
- public NullWritable getEdgeValue(LongWritable targetVertexId) {
- return NullWritable.get();
- }
-
- @Override
public int size() {
return neighbors.size();
}
@@ -93,12 +87,19 @@ public class LongNullHashSetEdges
@Override
public Iterator<Edge<LongWritable, NullWritable>> iterator() {
// Returns an iterator that reuses objects.
- return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+ // The downcast is fine because all concrete Edge implementations are
+ // mutable, but we only expose the mutation functionality when appropriate.
+ return (Iterator) mutableIterator();
+ }
+
+ @Override
+ public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
+ return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
/** Wrapped neighbors iterator. */
private LongIterator neighborsIt = neighbors.iterator();
/** Representative edge object. */
- private MutableEdge<LongWritable, NullWritable> representativeEdge =
- getConf().createMutableEdge();
+ private ReusableEdge<LongWritable, NullWritable> representativeEdge =
+ getConf().createReusableEdge();
@Override
public boolean hasNext() {
@@ -106,10 +107,15 @@ public class LongNullHashSetEdges
}
@Override
- public Edge<LongWritable, NullWritable> next() {
+ public MutableEdge<LongWritable, NullWritable> next() {
representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
return representativeEdge;
}
+
+ @Override
+ public void remove() {
+ neighborsIt.remove();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
new file mode 100644
index 0000000..cb8702c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Map;
+
+/**
+ * Helper class for a mutable edge that modifies the backing map entry.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MapMutableEdge<I extends WritableComparable, E extends Writable>
+ implements MutableEdge<I, E> {
+ /** Backing entry for the edge in the map. */
+ private Map.Entry<I, E> entry;
+
+ /**
+ * Set the backing entry for this edge in the map.
+ *
+ * @param entry Backing entry
+ */
+ public void setEntry(Map.Entry<I, E> entry) {
+ this.entry = entry;
+ }
+
+ @Override
+ public void setValue(E value) {
+ // Replace the value in the map.
+ entry.setValue(value);
+ }
+
+ @Override
+ public I getTargetVertexId() {
+ return entry.getKey();
+ }
+
+ @Override
+ public E getValue() {
+ return entry.getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
index bf00b4f..78e34e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
@@ -22,9 +22,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
- * A complete edge, the target vertex and the edge value. Can only be one
- * edge with a destination vertex id per edge map. This edge can be mutated,
- * that is you can set it's target vertex ID and edge value.
+ * An edge whose value can be modified.
*
* @param <I> Vertex index
* @param <E> Edge value
@@ -32,16 +30,10 @@ import org.apache.hadoop.io.WritableComparable;
public interface MutableEdge<I extends WritableComparable, E extends Writable>
extends Edge<I, E> {
/**
- * Set the destination vertex index of this edge.
- *
- * @param targetVertexId new destination vertex
- */
- void setTargetVertexId(I targetVertexId);
-
- /**
* Set the value for this edge.
*
* @param value new edge value
*/
void setValue(E value);
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
new file mode 100644
index 0000000..9dab09c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * Helper class to provide a mutable iterable over the edges when the chosen
+ * {@link VertexEdges} doesn't offer a specialized one.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MutableEdgesIterable<I extends WritableComparable,
+ E extends Writable> implements Iterable<MutableEdge<I, E>> {
+ /** Vertex that owns the out-edges. */
+ private Vertex<I, ?, E, ?> vertex;
+
+ /**
+ * Constructor.
+ *
+ * @param vertex Owning vertex
+ */
+ public MutableEdgesIterable(Vertex<I, ?, E, ?> vertex) {
+ this.vertex = vertex;
+ }
+
+ @Override
+ public Iterator<MutableEdge<I, E>> iterator() {
+ final MutableEdgesWrapper<I, E> mutableEdgesWrapper =
+ MutableEdgesWrapper.wrap((VertexEdges<I, E>) vertex.getEdges(),
+ vertex.getConf());
+ vertex.setEdges(mutableEdgesWrapper);
+
+ return new Iterator<MutableEdge<I, E>>() {
+ /** Iterator over the old edges. */
+ private Iterator<Edge<I, E>> oldEdgesIterator =
+ mutableEdgesWrapper.getOldEdgesIterator();
+ /** New edges data structure. */
+ private VertexEdges<I, E> newEdges = mutableEdgesWrapper.getNewEdges();
+
+ @Override
+ public boolean hasNext() {
+ // If the current edge is not null,
+ // we need to add it to the new edges.
+ Edge<I, E> currentEdge = mutableEdgesWrapper.getCurrentEdge();
+ if (currentEdge != null) {
+ newEdges.add(currentEdge);
+ mutableEdgesWrapper.setCurrentEdge(null);
+ }
+ if (!oldEdgesIterator.hasNext()) {
+ vertex.setEdges(newEdges);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public MutableEdge<I, E> next() {
+ // If the current edge is not null,
+ // we need to add it to the new edges.
+ MutableEdge<I, E> currentEdge =
+ mutableEdgesWrapper.getCurrentEdge();
+ if (currentEdge != null) {
+ newEdges.add(currentEdge);
+ }
+ // Read the next edge and return it.
+ currentEdge = (MutableEdge<I, E>) oldEdgesIterator.next();
+ mutableEdgesWrapper.setCurrentEdge(currentEdge);
+ return currentEdge;
+ }
+
+ @Override
+ public void remove() {
+ // Set the current edge to null, so that it's not added to the
+ // new edges.
+ mutableEdgesWrapper.setCurrentEdge(null);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
new file mode 100644
index 0000000..5b20bf7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Helper class that wraps the current out-edges and inserts them into a new
+ * data structure as they are iterated over.
+ * Used by Vertex to provide a mutable iterator when the chosen
+ * {@link VertexEdges} doesn't offer a specialized one.
+ * The edges are "unwrapped" back to the chosen {@link VertexEdges} data
+ * structure as soon as possible: either when the iterator is exhausted,
+ * or after compute() if iteration has been terminated early.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MutableEdgesWrapper<I extends WritableComparable,
+ E extends Writable> implements VertexEdges<I, E> {
+ /** New edges data structure (initially empty). */
+ private final VertexEdges<I, E> newEdges;
+ /** Iterator over the old edges. */
+ private final Iterator<Edge<I, E>> oldEdgesIterator;
+ /** Last edge that was returned during iteration. */
+ private MutableEdge<I, E> currentEdge;
+
+ /**
+ * Private constructor: instantiation happens through the {@code wrap()}
+ * factory method.
+ *
+ * @param oldEdges Current out-edges
+ * @param newEdges New (empty) edges data structure
+ */
+ private MutableEdgesWrapper(VertexEdges<I, E> oldEdges,
+ VertexEdges<I, E> newEdges) {
+ oldEdgesIterator = oldEdges.iterator();
+ this.newEdges = newEdges;
+ }
+
+ /**
+ * Factory method to create a new wrapper over the existing out-edges.
+ *
+ * @param edges Current out-edges
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ * @return The wrapped edges
+ */
+ public static <I extends WritableComparable, E extends Writable>
+ MutableEdgesWrapper<I, E> wrap(
+ VertexEdges<I, E> edges,
+ ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
+ MutableEdgesWrapper<I, E> wrapper = new MutableEdgesWrapper<I, E>(
+ edges, conf.createAndInitializeVertexEdges(edges.size()));
+ return wrapper;
+ }
+
+ /**
+ * Moves all the remaining edges to the new data structure, and returns it.
+ *
+ * @return The new {@link VertexEdges} data structure.
+ */
+ public VertexEdges<I, E> unwrap() {
+ if (currentEdge != null) {
+ newEdges.add(currentEdge);
+ currentEdge = null;
+ }
+ while (oldEdgesIterator.hasNext()) {
+ newEdges.add(oldEdgesIterator.next());
+ }
+ return newEdges;
+ }
+
+ /**
+ * Get the new {@link VertexEdges} data structure.
+ *
+ * @return New edges
+ */
+ public VertexEdges<I, E> getNewEdges() {
+ return newEdges;
+ }
+
+ /**
+ * Get the iterator over the old edges data structure.
+ *
+ * @return Old edges iterator
+ */
+ public Iterator<Edge<I, E>> getOldEdgesIterator() {
+ return oldEdgesIterator;
+ }
+
+ /**
+ * Get the last edge returned by the mutable iterator.
+ *
+ * @return Last edge iterated on
+ */
+ public MutableEdge<I, E> getCurrentEdge() {
+ return currentEdge;
+ }
+
+ /**
+ * Set the last edge returned by the mutable iterator.
+ *
+ * @param edge Last edge iterated on
+ */
+ public void setCurrentEdge(MutableEdge<I, E> edge) {
+ currentEdge = edge;
+ }
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+ "never be initialized.");
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+ "never be initialized.");
+ }
+
+ @Override
+ public void initialize() {
+ throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+ "never be initialized.");
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ unwrap().add(edge);
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ unwrap().remove(targetVertexId);
+ }
+
+ @Override
+ public int size() {
+ return unwrap().size();
+ }
+
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ return unwrap().iterator();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new IllegalStateException("write: MutableEdgesWrapper should " +
+ "never be serialized.");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new IllegalStateException("readFields: MutableEdgesWrapper should " +
+ "never be deserialized.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
new file mode 100644
index 0000000..503de92
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * Interface for {@link VertexEdges} implementations that have an optimized
+ * mutable edge iterator.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface MutableVertexEdges<I extends WritableComparable,
+ E extends Writable> extends VertexEdges<I, E> {
+ /**
+ * Returns an iterator over edges that can be modified in-place,
+ * either by changing the current edge value or by removing the current edge.
+ *
+ * @return A mutable edge iterator
+ */
+ Iterator<MutableEdge<I, E>> mutableIterator();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
new file mode 100644
index 0000000..d55e25b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map. This edge can be reused,
+ * that is you can set it's target vertex ID and edge value.
+ * Note: this class is useful for certain optimizations,
+ * but it's not meant to be exposed to the user. Look at {@link MutableEdge}
+ * instead.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface ReusableEdge<I extends WritableComparable, E extends Writable>
+ extends MutableEdge<I, E> {
+ /**
+ * Set the destination vertex index of this edge.
+ *
+ * @param targetVertexId new destination vertex
+ */
+ void setTargetVertexId(I targetVertexId);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
index 36381a7..5fdc2d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
@@ -39,4 +39,13 @@ public interface StrictRandomAccessVertexEdges<I extends WritableComparable,
* @return Edge value
*/
E getEdgeValue(I targetVertexId);
+
+ /**
+ * Set the edge value for the given target vertex id (if an edge to that
+ * vertex exists).
+ *
+ * @param targetVertexId Target vertex id
+ * @param edgeValue Edge value
+ */
+ void setEdgeValue(I targetVertexId, E edgeValue);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 439ee5b..4840471 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -223,6 +223,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} finally {
computeOneTimerContext.stop();
}
+ // Need to unwrap the mutated edges (possibly)
+ vertex.unwrapMutableEdges();
// Need to save the vertex changes (possibly)
partition.saveVertex(vertex);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index 66f081a..3031274 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -23,6 +23,10 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MultiRandomAccessVertexEdges;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.edge.MutableEdgesIterable;
+import org.apache.giraph.edge.MutableEdgesWrapper;
+import org.apache.giraph.edge.MutableVertexEdges;
import org.apache.giraph.edge.StrictRandomAccessVertexEdges;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.partition.PartitionContext;
@@ -186,6 +190,42 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
+ * Get an iterable of out-edges that can be modified in-place.
+ * This can mean changing the current edge value or removing the current edge
+ * (by using the iterator version).
+ * Note: if
+ *
+ * @return An iterable of mutable out-edges
+ */
+ public Iterable<MutableEdge<I, E>> getMutableEdges() {
+ // If the VertexEdges implementation has a specialized mutable iterator,
+ // we use that; otherwise, we build a new data structure as we iterate
+ // over the current edges.
+ if (edges instanceof MutableVertexEdges) {
+ return new Iterable<MutableEdge<I, E>>() {
+ @Override
+ public Iterator<MutableEdge<I, E>> iterator() {
+ return ((MutableVertexEdges<I, E>) edges).mutableIterator();
+ }
+ };
+ } else {
+ return new MutableEdgesIterable<I, E>(this);
+ }
+ }
+
+ /**
+ * If a {@link MutableEdgesWrapper} was used to provide a mutable iterator,
+ * copy any remaining edges to the new {@link VertexEdges} data
+ * structure and keep a direct reference to it (thus discarding the wrapper).
+ * Called by the Giraph infrastructure after computation.
+ */
+ public void unwrapMutableEdges() {
+ if (edges instanceof MutableEdgesWrapper) {
+ edges = ((MutableEdgesWrapper<I, E>) edges).unwrap();
+ }
+ }
+
+ /**
* Get the number of outgoing edges on this vertex.
*
* @return the total number of outbound edges from this vertex
@@ -205,6 +245,8 @@ public abstract class Vertex<I extends WritableComparable,
* @return Edge value (or null if missing)
*/
public E getEdgeValue(I targetVertexId) {
+ // If the VertexEdges implementation has a specialized random-access
+ // method, we use that; otherwise, we scan the edges.
if (edges instanceof StrictRandomAccessVertexEdges) {
return ((StrictRandomAccessVertexEdges<I, E>) edges)
.getEdgeValue(targetVertexId);
@@ -219,6 +261,28 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
+ * If an edge to the target vertex exists, set it to the given edge value.
+ * This only makes sense with strict graphs.
+ *
+ * @param targetVertexId Target vertex id
+ * @param edgeValue Edge value
+ */
+ public void setEdgeValue(I targetVertexId, E edgeValue) {
+ // If the VertexEdges implementation has a specialized random-access
+ // method, we use that; otherwise, we scan the edges.
+ if (edges instanceof StrictRandomAccessVertexEdges) {
+ ((StrictRandomAccessVertexEdges<I, E>) edges).setEdgeValue(
+ targetVertexId, edgeValue);
+ } else {
+ for (MutableEdge<I, E> edge : getMutableEdges()) {
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ edge.setValue(edgeValue);
+ }
+ }
+ }
+ }
+
+ /**
* Get an iterable over the values of all edges with the given target
* vertex id. This only makes sense for multigraphs (i.e. graphs with
* parallel edges).
@@ -230,6 +294,8 @@ public abstract class Vertex<I extends WritableComparable,
* @return Iterable of edge values
*/
public Iterable<E> getAllEdgeValues(final I targetVertexId) {
+ // If the VertexEdges implementation has a specialized random-access
+ // method, we use that; otherwise, we scan the edges.
if (edges instanceof MultiRandomAccessVertexEdges) {
return ((MultiRandomAccessVertexEdges<I, E>) edges)
.getAllEdgeValues(targetVertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 8a5fb01..2c5f2f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -47,7 +47,7 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
@Override
public Edge<I, E> createData() {
- return getConf().createMutableEdge();
+ return getConf().createReusableEdge();
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java b/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
deleted file mode 100644
index 413656b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Writable implementation of {@link PairList}.
- *
- * @param <U> Type of the first element in a pair
- * @param <V> Type of the second element in a pair
- */
-public abstract class PairListWritable<U extends Writable,
- V extends Writable> extends PairList<U, V> implements Writable {
- /**
- * Create an empty instance of the first element in the pair,
- * so we could read it from {@DataInput}.
- *
- * @return New instance of the first element in the pair
- */
- protected abstract U newFirstInstance();
-
- /**
- * Create an empty instance of the second element in the pair,
- * so we could read it from {@DataInput}.
- *
- * @return New instance of the second element in the pair
- */
- protected abstract V newSecondInstance();
-
- @Override
- public void write(DataOutput output) throws IOException {
- int size = getSize();
- output.writeInt(size);
- for (int i = 0; i < size; i++) {
- firstList.get(i).write(output);
- secondList.get(i).write(output);
- }
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int size = input.readInt();
- firstList = Lists.newArrayListWithCapacity(size);
- secondList = Lists.newArrayListWithCapacity(size);
- while (size-- > 0) {
- U first = newFirstInstance();
- first.readFields(input);
- V second = newSecondInstance();
- second.readFields(input);
- add(first, second);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java b/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
index ef3fbc8..79bf093 100644
--- a/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
@@ -25,6 +25,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import static org.apache.giraph.graph.TestVertexAndEdges.instantiateVertexEdges;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertEquals;
*/
public class TestNullValueEdges {
/** {@link VertexEdges} classes to be tested. */
- private Collection<Class<? extends VertexEdges>>
+ private Collection<Class<? extends MutableVertexEdges>>
edgesClasses = Lists.newArrayList();
@Before
@@ -65,10 +66,51 @@ public class TestNullValueEdges {
edges.initialize(initialEdges);
assertEquals(3, edges.size());
- edges.add(EdgeFactory.createMutable(new LongWritable(4)));
+ edges.add(EdgeFactory.createReusable(new LongWritable(4)));
assertEquals(4, edges.size());
edges.remove(new LongWritable(2));
assertEquals(3, edges.size());
}
+
+ /**
+ * Test in-place edge mutations via the iterable returned by {@link
+ * org.apache.giraph.graph.Vertex#getMutableEdges()}.
+ */
+ @Test
+ public void testMutateEdges() {
+ for (Class<? extends MutableVertexEdges> edgesClass : edgesClasses) {
+ testMutateEdgesClass(edgesClass);
+ }
+ }
+
+ private void testMutateEdgesClass(
+ Class<? extends MutableVertexEdges> edgesClass) {
+ MutableVertexEdges<LongWritable, NullWritable> edges =
+ (MutableVertexEdges<LongWritable, NullWritable>)
+ instantiateVertexEdges(edgesClass);
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(EdgeFactory.create(new LongWritable(i)));
+ }
+
+ // Use the mutable iterator to remove edges with even id
+ Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt =
+ edges.mutableIterator();
+ while (edgeIt.hasNext()) {
+ if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+ edgeIt.remove();
+ }
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, edges.size());
+ // The edge ids should be all odd
+ for (Edge<LongWritable, NullWritable> edge : edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java b/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
index e6daa94..d3ad069 100644
--- a/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
@@ -74,5 +74,8 @@ public class TestStrictRandomAccessEdges {
assertEquals(3.0, edges.getEdgeValue(new LongWritable(3)).get(), 0.0);
assertNull(edges.getEdgeValue(new LongWritable(55)));
+
+ edges.setEdgeValue(new LongWritable(2), new DoubleWritable(33.0));
+ assertEquals(33.0, edges.getEdgeValue(new LongWritable(2)).get(), 0);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index 425abe7..fb5b685 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -28,6 +28,7 @@ import org.apache.giraph.edge.HashMapEdges;
import org.apache.giraph.edge.HashMultimapEdges;
import org.apache.giraph.edge.LongDoubleArrayEdges;
import org.apache.giraph.edge.LongDoubleHashMapEdges;
+import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
@@ -44,8 +45,11 @@ import org.apache.hadoop.io.LongWritable;
import org.junit.Before;
import org.junit.Test;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -53,7 +57,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
- * Test {@link Vertex} functionality across the provided {@link org.apache.giraph.edge.VertexEdges}
+ * Test {@link Vertex} functionality across the provided {@link VertexEdges}
* classes.
*/
public class TestVertexAndEdges {
@@ -72,8 +76,82 @@ public class TestVertexAndEdges {
public void compute(Iterable<LongWritable> messages) { }
}
+ /**
+ * A basic {@link VertexEdges} implementation that doesn't provide any
+ * special functionality. Used to test the default implementations of
+ * Vertex#getEdgeValue(), Vertex#getMutableEdges(), etc.
+ */
+ public static class TestVertexEdges
+ implements VertexEdges<LongWritable, DoubleWritable> {
+ private List<Edge<LongWritable, DoubleWritable>> edgeList;
+
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+ this.edgeList = Lists.newArrayList(edges);
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ this.edgeList = Lists.newArrayListWithCapacity(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ this.edgeList = Lists.newArrayList();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, DoubleWritable> edge) {
+ edgeList.add(edge);
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ for (Iterator<Edge<LongWritable, DoubleWritable>> edges =
+ edgeList.iterator(); edges.hasNext();) {
+ Edge<LongWritable, DoubleWritable> edge = edges.next();
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ edges.remove();
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return edgeList.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+ return edgeList.iterator();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(edgeList.size());
+ for (Edge<LongWritable, DoubleWritable> edge : edgeList) {
+ edge.getTargetVertexId().write(out);
+ edge.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ Edge<LongWritable, DoubleWritable> edge = EdgeFactory.createReusable(
+ new LongWritable(), new DoubleWritable());
+ WritableUtils.readEdge(in, edge);
+ edgeList.add(edge);
+ }
+ }
+ }
+
@Before
public void setUp() {
+ edgesClasses.add(TestVertexEdges.class);
edgesClasses.add(ByteArrayEdges.class);
edgesClasses.add(ArrayListEdges.class);
edgesClasses.add(HashMapEdges.class);
@@ -157,6 +235,95 @@ public class TestVertexAndEdges {
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
assert(edge.getTargetVertexId().get() != 500);
}
+
+ vertex.setEdgeValue(new LongWritable(10), new DoubleWritable(33.0));
+ assertEquals(33.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0);
+ }
+
+ /**
+ * Test in-place edge mutations via the iterable returned by {@link
+ * Vertex#getMutableEdges()}.
+ */
+ @Test
+ public void testMutateEdges() {
+ for (Class<? extends VertexEdges> edgesClass : edgesClasses) {
+ testMutateEdgesClass(edgesClass);
+ }
+ }
+
+ private void testMutateEdgesClass(Class<? extends VertexEdges> edgesClass) {
+ Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+ instantiateVertex(edgesClass);
+ VertexEdges<LongWritable, DoubleWritable> vertexEdges =
+ instantiateVertexEdges(edgesClass);
+
+ vertexEdges.initialize();
+ vertex.initialize(new LongWritable(0), new FloatWritable(0), vertexEdges);
+
+ // Add 10 edges with id i, value i for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ vertex.addEdge(EdgeFactory.create(
+ new LongWritable(i), new DoubleWritable(i)));
+ }
+
+ // Use the mutable iterable to multiply each edge value by 2
+ for (MutableEdge<LongWritable, DoubleWritable> edge :
+ vertex.getMutableEdges()) {
+ edge.setValue(new DoubleWritable(edge.getValue().get() * 2));
+ }
+
+ // We should still have 10 edges
+ assertEquals(10, vertex.getNumEdges());
+ // The edge values should now be double the ids
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ long id = edge.getTargetVertexId().get();
+ double value = edge.getValue().get();
+ assertEquals(id * 2, value, 0);
+ }
+
+ // Use the mutable iterator to remove edges with even id
+ Iterator<MutableEdge<LongWritable, DoubleWritable>> edgeIt =
+ vertex.getMutableEdges().iterator();
+ while (edgeIt.hasNext()) {
+ if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+ edgeIt.remove();
+ }
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, vertex.getNumEdges());
+ // The edge ids should be all odd
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ }
+
+ // Breaking iteration early should not make us lose edges.
+ // This version uses repeated calls to next():
+ Iterator<MutableEdge<LongWritable, DoubleWritable>> it =
+ vertex.getMutableEdges().iterator();
+ it.next();
+ it.next();
+ assertEquals(5, vertex.getNumEdges());
+
+ // This version uses a for-each loop, and the break statement:
+ int i = 2;
+ for (MutableEdge<LongWritable, DoubleWritable> edge :
+ vertex.getMutableEdges()) {
+ System.out.println(edge.toString());
+ if (i-- == 0) {
+ break;
+ }
+ }
+ assertEquals(5, vertex.getNumEdges());
+
+ // This version uses a normal, immutable iterable:
+ i = 2;
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ if (i-- == 0) {
+ break;
+ }
+ }
+ assertEquals(5, vertex.getNumEdges());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index e0c502c..6fb183a 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -20,7 +20,7 @@ package org.apache.giraph.hive.input.edge;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.edge.ReusableEdge;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
@@ -62,7 +62,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
* If we are reusing edges this will be the single edge to read into.
* Otherwise if it's null we will create a new edge each time.
*/
- private MutableEdge<I, E> edgeToReuse = null;
+ private ReusableEdge<I, E> edgeToReuse = null;
/**
* Get underlying Hive record reader used.
@@ -117,7 +117,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
instantiateHiveToEdgeFromConf();
if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
- edgeToReuse = conf.createMutableEdge();
+ edgeToReuse = conf.createReusableEdge();
}
}
@@ -160,9 +160,9 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
public Edge<I, E> getCurrentEdge() throws IOException,
InterruptedException {
HiveRecord record = hiveRecordReader.getCurrentValue();
- MutableEdge<I, E> edge = edgeToReuse;
+ ReusableEdge<I, E> edge = edgeToReuse;
if (edge == null) {
- edge = conf.createMutableEdge();
+ edge = conf.createReusableEdge();
}
edge.setValue(hiveToEdge.getEdgeValue(record));
edge.setTargetVertexId(hiveToEdge.getTargetVertexId(record));