You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/07 06:37:41 UTC
[7/8] GIRAPH-528: Decouple vertex implementation from edge storage
(apresta)
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 0fc1858..d1e99cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -39,7 +39,7 @@ import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
@@ -49,7 +49,7 @@ import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 697b6ce..1fb0580 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -34,7 +34,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.partition.Partition;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index f301bbf..793768a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -19,7 +19,7 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 5c2a01a..5090250 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -18,24 +18,27 @@
package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.partition.DefaultPartitionContext;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.partition.DefaultPartitionContext;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -50,6 +53,7 @@ import java.util.List;
* @param <E> Edge class
* @param <M> Message class
*/
+@SuppressWarnings("unchecked")
public class GiraphClasses<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements GiraphConstants {
@@ -63,6 +67,8 @@ public class GiraphClasses<I extends WritableComparable,
protected Class<E> edgeValueClass;
/** Message value class - cached for fast access */
protected Class<M> messageValueClass;
+ /** Vertex edges class - cached for fast access */
+ protected Class<? extends VertexEdges<I, E>> vertexEdgesClass;
/** Graph partitioner factory class - cached for fast access */
protected Class<? extends GraphPartitionerFactory<I, V, E, M>>
@@ -96,9 +102,25 @@ public class GiraphClasses<I extends WritableComparable,
protected Class<? extends Partition<I, V, E, M>> partitionClass;
/**
- * Empty constructor. Initialize with classes all null.
+ * Empty constructor. Initialize with default classes or null.
*/
- public GiraphClasses() { }
+ public GiraphClasses() {
+ // Note: the cast to Object is required in order for javac to accept the
+ // downcast.
+ vertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object)
+ ByteArrayEdges.class;
+ graphPartitionerFactoryClass =
+ (Class<? extends GraphPartitionerFactory<I, V, E, M>>) (Object)
+ HashPartitionerFactory.class;
+ aggregatorWriterClass = TextAggregatorWriter.class;
+ vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+ (Object) DefaultVertexResolver.class;
+ partitionContextClass = DefaultPartitionContext.class;
+ workerContextClass = DefaultWorkerContext.class;
+ masterComputeClass = DefaultMasterCompute.class;
+ partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
+ SimplePartition.class;
+ }
/**
* Contructor that reads classes from a Configuration object.
@@ -118,13 +140,15 @@ public class GiraphClasses<I extends WritableComparable,
// set pre-validated generic parameter types into Configuration
vertexClass = (Class<? extends Vertex<I, V, E, M>>)
conf.getClass(VERTEX_CLASS, null, Vertex.class);
- List<Class<?>> classList =
- org.apache.giraph.utils.ReflectionUtils.<Vertex>getTypeArguments(
- Vertex.class, vertexClass);
+ List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
+ vertexClass);
vertexIdClass = (Class<I>) classList.get(0);
vertexValueClass = (Class<V>) classList.get(1);
edgeValueClass = (Class<E>) classList.get(2);
messageValueClass = (Class<M>) classList.get(3);
+ vertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
+ conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class,
+ VertexEdges.class);
graphPartitionerFactoryClass =
(Class<? extends GraphPartitionerFactory<I, V, E, M>>)
@@ -206,12 +230,12 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Check if we GraphPartitionerFactory is set
+ * Get Vertex edges class
*
- * @return true if GraphPartitionerFactory is set
+ * @return Vertex edges class.
*/
- public boolean hasGraphPartitionerFactoryClass() {
- return graphPartitionerFactoryClass != null;
+ public Class<? extends VertexEdges<I, E>> getVertexEdgesClass() {
+ return vertexEdgesClass;
}
/**
@@ -463,6 +487,19 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Set VertexEdges class held
+ *
+ * @param vertexEdgesClass Vertex edges class to set
+ * @return this
+ */
+ public GiraphClasses setVertexEdgesClass(
+ Class<? extends VertexEdges> vertexEdgesClass) {
+ this.vertexEdgesClass =
+ (Class<? extends VertexEdges<I, E>>) vertexEdgesClass;
+ return this;
+ }
+
+ /**
* Set GraphPartitionerFactory class held
*
* @param klass GraphPartitionerFactory to set
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 3ea8d3b..6886d58 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -31,7 +32,7 @@ import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -74,6 +75,16 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Set the vertex edges class
+ *
+ * @param vertexEdgesClass Determines the way edges are stored
+ */
+ public final void setVertexEdgesClass(
+ Class<? extends VertexEdges> vertexEdgesClass) {
+ setClass(VERTEX_EDGES_CLASS, vertexEdgesClass, VertexEdges.class);
+ }
+
+ /**
* Set the vertex input format class (required)
*
* @param vertexInputFormatClass Determines how graph is input
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index fcdd57b..ad9073d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -24,6 +24,8 @@ package org.apache.giraph.conf;
public interface GiraphConstants {
/** Vertex class - required */
String VERTEX_CLASS = "giraph.vertexClass";
+ /** Vertex edges class - required */
+ String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
/** Class for Master - optional */
String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index e6c4cc6..8457b8b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,10 +20,7 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.graph.MutableEdge;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -32,10 +29,8 @@ import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
-import org.apache.giraph.partition.MasterGraphPartitioner;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
@@ -43,7 +38,11 @@ import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -62,12 +61,10 @@ import org.apache.hadoop.util.Progressable;
* @param <E> Edge data
* @param <M> Message data
*/
+@SuppressWarnings("unchecked")
public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends GiraphConfiguration {
- /** Master graph partitioner - cached for fast access */
- protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
-
/** Holder for all the classes */
private final GiraphClasses classes;
@@ -85,11 +82,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*/
public ImmutableClassesGiraphConfiguration(Configuration conf) {
super(conf);
-
classes = new GiraphClasses(conf);
- masterGraphPartitioner = (MasterGraphPartitioner<I, V, E, M>)
- createGraphPartitioner().createMasterGraphPartitioner();
-
useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
USE_UNSAFE_SERIALIZATION_DEFAULT);
}
@@ -127,24 +120,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Create a user graph partitioner partition stats class
- *
- * @return Instantiated user graph partition stats class
- */
- public PartitionStats createGraphPartitionStats() {
- return getMasterGraphPartitioner().createPartitionStats();
- }
-
- /**
- * Get the cached MasterGraphPartitioner.
- *
- * @return MasterGraphPartitioner cached in this class.
- */
- public MasterGraphPartitioner<I, V, E, M> getMasterGraphPartitioner() {
- return masterGraphPartitioner;
- }
-
- /**
* Does the job have a {@link VertexInputFormat}?
*
* @return True iff a {@link VertexInputFormat} has been specified.
@@ -177,6 +152,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Does the job have a {@link VertexOutputFormat}?
+ *
+ * @return True iff a {@link VertexOutputFormat} has been specified.
+ */
+ public boolean hasVertexOutputFormat() {
+ return classes.hasVertexOutputFormat();
+ }
+
+ /**
* Get the user's subclassed
* {@link org.apache.giraph.io.VertexOutputFormat}.
*
@@ -247,6 +231,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get the user's subclassed {@link Combiner} class.
+ *
+ * @return User's combiner class
+ */
+ public Class<? extends Combiner<I, M>> getCombinerClass() {
+ return classes.getCombinerClass();
+ }
+
+ /**
* Create a user combiner class
*
* @return Instantiated user combiner class
@@ -350,7 +343,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
- * Get the user's subclassed {@link org.apache.giraph.vertex.Vertex}
+ * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
*
* @return User's vertex class
*/
@@ -560,6 +553,51 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get the user's subclassed {@link VertexEdges}
+ *
+ * @return User's vertex edges class
+ */
+ public Class<? extends VertexEdges<I, E>> getVertexEdgesClass() {
+ return classes.getVertexEdgesClass();
+ }
+
+ /**
+ * Create a user {@link VertexEdges}
+ *
+ * @return Instantiated user VertexEdges
+ */
+ public VertexEdges<I, E> createVertexEdges() {
+ return ReflectionUtils.newInstance(getVertexEdgesClass(), this);
+ }
+
+ /**
+ * Create a {@link VertexEdges} instance and initialize it with the given
+ * capacity (the number of edges that will be added).
+ *
+ * @param capacity Number of edges that will be added
+ * @return Instantiated VertexEdges
+ */
+ public VertexEdges<I, E> createAndInitializeVertexEdges(int capacity) {
+ VertexEdges<I, E> vertexEdges = createVertexEdges();
+ vertexEdges.initialize(capacity);
+ return vertexEdges;
+ }
+
+ /**
+ * Create a {@link VertexEdges} instance and initialize it with the given
+ * iterable of edges.
+ *
+ * @param edges Iterable of edges to add
+ * @return Instantiated VertexEdges
+ */
+ public VertexEdges<I, E> createAndInitializeVertexEdges(
+ Iterable<Edge<I, E>> edges) {
+ VertexEdges<I, E> vertexEdges = createVertexEdges();
+ vertexEdges.initialize(edges);
+ return vertexEdges;
+ }
+
+ /**
* Create a partition
*
* @param id Partition id
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
new file mode 100644
index 0000000..68d4ec0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation backed by an {@link ArrayList}.
+ * Parallel edges are allowed.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class ArrayListEdges<I extends WritableComparable, E extends Writable>
+ extends ConfigurableVertexEdges<I, E> {
+ /** List of edges. */
+ private ArrayList<Edge<I, E>> edgeList;
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ if (edges != null) {
+ // If the iterable is actually an instance of ArrayList,
+ // we simply copy the reference.
+ // Otherwise we have to add every edge.
+ if (edges instanceof ArrayList) {
+ edgeList = (ArrayList<Edge<I, E>>) edges;
+ } else {
+ edgeList = Lists.newArrayList(edges);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ edgeList = Lists.newArrayListWithCapacity(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ edgeList = Lists.newArrayList();
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ edgeList.add(edge);
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext();) {
+ Edge<I, E> edge = edges.next();
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ edges.remove();
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return edgeList.size();
+ }
+
+ @Override
+ public final Iterator<Edge<I, E>> iterator() {
+ return edgeList.iterator();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(edgeList.size());
+ for (Edge<I, E> edge : edgeList) {
+ edge.getTargetVertexId().write(out);
+ edge.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ Edge<I, E> edge = getConf().createEdge();
+ WritableUtils.readEdge(in, edge);
+ edgeList.add(edge);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
new file mode 100644
index 0000000..be74ad1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * {@link VertexEdges} implementation backed by a byte array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
+ extends ConfigurableVertexEdges<I, E>
+ implements ReuseObjectsVertexEdges<I, E> {
+ /** Serialized edges. */
+ private byte[] serializedEdges;
+ /** Number of bytes used in serializedEdges. */
+ private int serializedEdgesBytesUsed;
+ /** Number of edges. */
+ private int edgeCount;
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ ExtendedDataOutput extendedOutputStream =
+ getConf().createExtendedDataOutput();
+ if (edges != null) {
+ for (Edge<I, E> edge : edges) {
+ try {
+ WritableUtils.writeEdge(extendedOutputStream, edge);
+ } catch (IOException e) {
+ throw new IllegalStateException("initialize: Failed to serialize " +
+ edge);
+ }
+ ++edgeCount;
+ }
+ }
+ serializedEdges = extendedOutputStream.getByteArray();
+ serializedEdgesBytesUsed = extendedOutputStream.getPos();
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ // We have no way to know the size in bytes used by a certain
+ // number of edges.
+ initialize();
+ }
+
+ @Override
+ public void initialize() {
+ // No-op: no need to initialize the byte-array if there are no edges,
+ // since add() and iterator() work fine with a null buffer.
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ ExtendedDataOutput extendedDataOutput =
+ getConf().createExtendedDataOutput(
+ serializedEdges, serializedEdgesBytesUsed);
+ try {
+ WritableUtils.writeEdge(extendedDataOutput, edge);
+ } catch (IOException e) {
+ throw new IllegalStateException("add: Failed to write to the new " +
+ "byte array");
+ }
+ serializedEdges = extendedDataOutput.getByteArray();
+ serializedEdgesBytesUsed = extendedDataOutput.getPos();
+ ++edgeCount;
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ // Note that this is very expensive (deserializes all edges).
+ ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
+ List<Integer> foundStartOffsets = new LinkedList<Integer>();
+ List<Integer> foundEndOffsets = new LinkedList<Integer>();
+ int lastStartOffset = 0;
+ while (iterator.hasNext()) {
+ Edge<I, E> edge = iterator.next();
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ foundStartOffsets.add(lastStartOffset);
+ foundEndOffsets.add(iterator.extendedDataInput.getPos());
+ --edgeCount;
+ }
+ lastStartOffset = iterator.extendedDataInput.getPos();
+ }
+ foundStartOffsets.add(serializedEdgesBytesUsed);
+
+ Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
+ Integer foundStartOffset = foundStartOffsetIter.next();
+ for (Integer foundEndOffset : foundEndOffsets) {
+ Integer nextFoundStartOffset = foundStartOffsetIter.next();
+ System.arraycopy(serializedEdges, foundEndOffset,
+ serializedEdges, foundStartOffset,
+ nextFoundStartOffset - foundEndOffset);
+ serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
+ foundStartOffset = nextFoundStartOffset;
+ }
+ }
+
+ @Override
+ public int size() {
+ return edgeCount;
+ }
+
+ /**
+ * Iterator that reuses the same Edge object.
+ */
+ private class ByteArrayEdgeIterator
+ extends UnmodifiableIterator<Edge<I, E>> {
+ /** Input for processing the bytes */
+ private ExtendedDataInput extendedDataInput =
+ getConf().createExtendedDataInput(
+ serializedEdges, 0, serializedEdgesBytesUsed);
+ /** Representative edge object. */
+ private MutableEdge<I, E> representativeEdge =
+ getConf().createMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return serializedEdges != null && extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public Edge<I, E> next() {
+ try {
+ WritableUtils.readEdge(extendedDataInput, representativeEdge);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: Failed on pos " +
+ extendedDataInput.getPos() + " edge " + representativeEdge);
+ }
+ return representativeEdge;
+ }
+ }
+
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ return new ByteArrayEdgeIterator();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ serializedEdgesBytesUsed = in.readInt();
+ if (serializedEdgesBytesUsed > 0) {
+ // Only create a new buffer if the old one isn't big enough
+ if (serializedEdges == null ||
+ serializedEdgesBytesUsed > serializedEdges.length) {
+ serializedEdges = new byte[serializedEdgesBytesUsed];
+ }
+ in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+ }
+ edgeCount = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(serializedEdgesBytesUsed);
+ if (serializedEdgesBytesUsed > 0) {
+ out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+ }
+ out.writeInt(edgeCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
new file mode 100644
index 0000000..faa12eb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract base class for {@link VertexEdges} implementations that require
+ * access to the configuration.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+@SuppressWarnings("unchecked")
+public abstract class ConfigurableVertexEdges<I extends WritableComparable,
+ E extends Writable>
+ implements VertexEdges<I, E>, ImmutableClassesGiraphConfigurable {
+ /** Configuration. */
+ private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration;
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
+ configuration = conf;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
new file mode 100644
index 0000000..461bff3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultEdge<I extends WritableComparable, E extends Writable>
+ implements MutableEdge<I, E> {
+ /** Target vertex id */
+ private I targetVertexId = null;
+ /** Edge value */
+ private E value = null;
+
+ /**
+ * Constructor for reflection
+ */
+ public DefaultEdge() { }
+
+ /**
+ * Create the edge with final values. Don't call, use EdgeFactory instead.
+ *
+ * @param targetVertexId Desination vertex id.
+ * @param value Value of the edge.
+ */
+ DefaultEdge(I targetVertexId, E value) {
+ this.targetVertexId = targetVertexId;
+ this.value = value;
+ }
+
+ @Override
+ public I getTargetVertexId() {
+ return targetVertexId;
+ }
+
+ @Override
+ public E getValue() {
+ return value;
+ }
+
+ @Override
+ public void setTargetVertexId(I targetVertexId) {
+ this.targetVertexId = targetVertexId;
+ }
+
+ @Override
+ public void setValue(E value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "(targetVertexId = " + targetVertexId + ", " +
+ "value = " + value + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java b/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
new file mode 100644
index 0000000..4649da1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface Edge<I extends WritableComparable, E extends Writable> {
+ /**
+ * Get the target vertex index of this edge
+ *
+ * @return Target vertex index of this edge
+ */
+ I getTargetVertexId();
+
+ /**
+ * Get the edge value of the edge
+ *
+ * @return Edge value of this edge
+ */
+ E getValue();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
new file mode 100644
index 0000000..3599207
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory for creating Edges
+ */
+public class EdgeFactory {
+ /** Do not construct */
+ private EdgeFactory() { }
+
+ /**
+ * Create an edge pointing to a given ID with a value
+ *
+ * @param id target ID
+ * @param value edge value
+ * @param <I> Vertex ID type
+ * @param <E> Edge Value type
+ * @return Edge pointing to ID with value
+ */
+ public static <I extends WritableComparable,
+ E extends Writable>
+ Edge<I, E> create(I id, E value) {
+ return createMutable(id, value);
+ }
+
+ /**
+ * Create an edge pointing to a given ID without a value
+ *
+ * @param id target ID
+ * @param <I> Vertex ID type
+ * @return Edge pointing to ID without a value
+ */
+ public static <I extends WritableComparable>
+ Edge<I, NullWritable> create(I id) {
+ return createMutable(id);
+ }
+
+ /**
+ * Create a mutable edge pointing to a given ID with a value
+ *
+ * @param id target ID
+ * @param value edge value
+ * @param <I> Vertex ID type
+ * @param <E> Edge Value type
+ * @return Edge pointing to ID with value
+ */
+ public static <I extends WritableComparable,
+ E extends Writable>
+ MutableEdge<I, E> createMutable(I id, E value) {
+ if (value instanceof NullWritable) {
+ return (MutableEdge<I, E>) createMutable(id);
+ } else {
+ return new DefaultEdge<I, E>(id, value);
+ }
+ }
+
+ /**
+ * Create a mutable edge pointing to a given ID with a value
+ *
+ * @param id target ID
+ * @param <I> Vertex ID type
+ * @return Edge pointing to ID with value
+ */
+ public static <I extends WritableComparable>
+ MutableEdge<I, NullWritable> createMutable(I id) {
+ return new EdgeNoValue<I>(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
new file mode 100644
index 0000000..dd22aec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An edge that has no value.
+ *
+ * @param <I> Vertex ID
+ */
+public class EdgeNoValue<I extends WritableComparable>
+ implements MutableEdge<I, NullWritable> {
+ /** Target vertex id */
+ private I targetVertexId = null;
+
+ /** Empty constructor */
+ EdgeNoValue() { }
+
+ /**
+ * Constructor with target vertex ID. Don't call, use EdgeFactory instead.
+ *
+ * @param targetVertexId vertex ID
+ */
+ EdgeNoValue(I targetVertexId) {
+ this.targetVertexId = targetVertexId;
+ }
+
+ @Override
+ public void setTargetVertexId(I targetVertexId) {
+ this.targetVertexId = targetVertexId;
+ }
+
+ @Override
+ public void setValue(NullWritable value) {
+ // do nothing
+ }
+
+ @Override
+ public I getTargetVertexId() {
+ return targetVertexId;
+ }
+
+ @Override
+ public NullWritable getValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public String toString() {
+ return "(targetVertexId = " + targetVertexId + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
new file mode 100644
index 0000000..64569bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Collects incoming edges for vertices owned by this worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(EdgeStore.class);
+ /** Service worker. */
+ private CentralizedServiceWorker<I, V, E, M> service;
+ /** Giraph configuration. */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** Progressable to report progress. */
+ private Progressable progressable;
+ /** Map used to temporarily store incoming edges. */
+ private ConcurrentMap<Integer,
+ ConcurrentMap<I, VertexEdges<I, E>>> transientEdges;
+ /**
+ * Whether the chosen {@link VertexEdges} implementation allows for Edge
+ * reuse.
+ */
+ private boolean reuseEdgeObjects;
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public EdgeStore(
+ CentralizedServiceWorker<I, V, E, M> service,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ Progressable progressable) {
+ this.service = service;
+ this.configuration = configuration;
+ this.progressable = progressable;
+ transientEdges = new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ reuseEdgeObjects = ReuseObjectsVertexEdges.class.isAssignableFrom(
+ configuration.getVertexEdgesClass());
+ }
+
+ /**
+ * Add edges belonging to a given partition on this worker.
+ * Note: This method is thread-safe.
+ *
+ * @param partitionId Partition id for the incoming edges.
+ * @param edges Incoming edges
+ */
+ public void addPartitionEdges(
+ int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+ ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
+ transientEdges.get(partitionId);
+ if (partitionEdges == null) {
+ ConcurrentMap<I, VertexEdges<I, E>> newPartitionEdges =
+ new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ partitionEdges = transientEdges.putIfAbsent(partitionId,
+ newPartitionEdges);
+ if (partitionEdges == null) {
+ partitionEdges = newPartitionEdges;
+ }
+ }
+ ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+ edges.getVertexIdEdgeIterator();
+ while (vertexIdEdgeIterator.hasNext()) {
+ vertexIdEdgeIterator.next();
+ I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+ Edge<I, E> edge = reuseEdgeObjects ?
+ vertexIdEdgeIterator.getCurrentEdge() :
+ vertexIdEdgeIterator.releaseCurrentEdge();
+ VertexEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
+ if (vertexEdges == null) {
+ VertexEdges<I, E> newVertexEdges = configuration.createVertexEdges();
+ vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
+ if (vertexEdges == null) {
+ vertexEdges = newVertexEdges;
+ // Only initialize the new vertex once we are sure it's going to be
+ // used.
+ vertexEdges.initialize();
+ // Since we had to use the vertex id as a new key in the map,
+ // we need to release the object.
+ vertexIdEdgeIterator.releaseCurrentVertexId();
+ }
+ }
+ synchronized (vertexEdges) {
+ vertexEdges.add(edge);
+ }
+ }
+ }
+
+ /**
+ * Move all edges from temporary storage to their source vertices.
+ * Note: this method is not thread-safe.
+ */
+ public void moveEdgesToVertices() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+ }
+ for (Map.Entry<Integer, ConcurrentMap<I,
+ VertexEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
+ Partition<I, V, E, M> partition =
+ service.getPartitionStore().getPartition(partitionEdges.getKey());
+ for (I vertexId : partitionEdges.getValue().keySet()) {
+ VertexEdges<I, E> vertexEdges =
+ partitionEdges.getValue().remove(vertexId);
+ Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+ // If the source vertex doesn't exist, create it. Otherwise,
+ // just set the edges.
+ if (vertex == null) {
+ vertex = configuration.createVertex();
+ vertex.initialize(vertexId, configuration.createVertexValue(),
+ vertexEdges);
+ partition.putVertex(vertex);
+ } else {
+ vertex.setEdges(vertexEdges);
+ // Some Partition implementations (e.g. ByteArrayPartition) require
+ // us to put back the vertex after modifying it.
+ partition.saveVertex(vertex);
+ }
+ progressable.progress();
+ }
+ // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
+ // require us to put back the partition after modifying it.
+ service.getPartitionStore().putPartition(partition);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+ "vertices.");
+ }
+ transientEdges.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
new file mode 100644
index 0000000..1aa9a46
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link VertexEdges} implementation backed by a {@link HashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * but uses more space.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HashMapEdges<I extends WritableComparable, E extends Writable>
+ extends ConfigurableVertexEdges<I, E>
+ implements StrictRandomAccessVertexEdges<I, E> {
+ /** Map from target vertex id to edge value. */
+ private HashMap<I, E> edgeMap;
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ if (edges != null) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the hash-map with the expected capacity.
+ if (edges instanceof Collection) {
+ initialize(((Collection<Edge<I, E>>) edges).size());
+ } else {
+ initialize();
+ }
+ for (Edge<I, E> edge : edges) {
+ add(edge);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ edgeMap = Maps.newHashMapWithExpectedSize(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ edgeMap = Maps.newHashMap();
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ edgeMap.put(edge.getTargetVertexId(), edge.getValue());
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ edgeMap.remove(targetVertexId);
+ }
+
+ @Override
+ public E getEdgeValue(I targetVertexId) {
+ return edgeMap.get(targetVertexId);
+ }
+
+ @Override
+ public int size() {
+ return edgeMap.size();
+ }
+
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<I, E>>() {
+ /** Wrapped map iterator. */
+ private Iterator<Map.Entry<I, E>> mapIterator =
+ edgeMap.entrySet().iterator();
+ /** Representative edge object. */
+ private MutableEdge<I, E> representativeEdge =
+ getConf().createMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public Edge<I, E> next() {
+ Map.Entry<I, E> nextEntry = mapIterator.next();
+ representativeEdge.setTargetVertexId(nextEntry.getKey());
+ representativeEdge.setValue(nextEntry.getValue());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(edgeMap.size());
+ for (Map.Entry<I, E> entry : edgeMap.entrySet()) {
+ entry.getKey().write(out);
+ entry.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ I targetVertexId = getConf().createVertexId();
+ targetVertexId.readFields(in);
+ E edgeValue = getConf().createEdgeValue();
+ edgeValue.readFields(in);
+ edgeMap.put(targetVertexId, edgeValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
new file mode 100644
index 0000000..143d7a4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link VertexEdges} implementation backed by an {@link ArrayListMultimap}.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for fast mutations,
+ * but uses more space.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HashMultimapEdges<I extends WritableComparable, E extends Writable>
+ extends ConfigurableVertexEdges<I, E>
+ implements MultiRandomAccessVertexEdges<I, E> {
+ /** Multimap from target vertex id to edge values. */
+ private ArrayListMultimap<I, E> edgeMultimap;
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the hash-multimap with the expected capacity.
+ if (edges instanceof Collection) {
+ initialize(((Collection<Edge<I, E>>) edges).size());
+ } else {
+ initialize();
+ }
+ for (Edge<I, E> edge : edges) {
+ add(edge);
+ }
+ }
+
+ /**
+ * Additional initialization method tailored to the underlying multimap
+ * implementation.
+ *
+ * @param expectedNeighbors Expected number of unique neighbors
+ * @param expectedEdgesPerNeighbor Expected number of edges per neighbor
+ */
+ public void initialize(int expectedNeighbors, int expectedEdgesPerNeighbor) {
+ edgeMultimap = ArrayListMultimap.create(expectedNeighbors,
+ expectedEdgesPerNeighbor);
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ // To be conservative in terms of space usage, we assume that the initial
+ // number of values per key is 1.
+ initialize(capacity, 1);
+ }
+
+ @Override
+ public void initialize() {
+ edgeMultimap = ArrayListMultimap.create();
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ edgeMultimap.put(edge.getTargetVertexId(), edge.getValue());
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ edgeMultimap.removeAll(targetVertexId);
+ }
+
+ @Override
+ public Iterable<E> getAllEdgeValues(I targetVertexId) {
+ return edgeMultimap.get(targetVertexId);
+ }
+
+ @Override
+ public int size() {
+ return edgeMultimap.size();
+ }
+
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<I, E>>() {
+ /** Wrapped map iterator. */
+ private Iterator<Map.Entry<I, E>> mapIterator =
+ edgeMultimap.entries().iterator();
+ /** Representative edge object. */
+ private MutableEdge<I, E> representativeEdge =
+ getConf().createMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public Edge<I, E> next() {
+ Map.Entry<I, E> nextEntry = mapIterator.next();
+ representativeEdge.setTargetVertexId(nextEntry.getKey());
+ representativeEdge.setValue(nextEntry.getValue());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // We write both the total number of edges and the number of unique
+ // neighbors.
+ out.writeInt(edgeMultimap.size());
+ out.writeInt(edgeMultimap.keys().size());
+ for (Map.Entry<I, E> edge : edgeMultimap.entries()) {
+ edge.getKey().write(out);
+ edge.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // Given the total number of pairs and the number of unique neighbors,
+ // we are able to compute the average number of edges per neighbors.
+ int numEdges = in.readInt();
+ int numNeighbors = in.readInt();
+ initialize(numEdges, numNeighbors == 0 ? 0 : numEdges / numNeighbors);
+ for (int i = 0; i < numEdges; ++i) {
+ I targetVertexId = getConf().createVertexId();
+ targetVertexId.readFields(in);
+ E edgeValue = getConf().createEdgeValue();
+ edgeValue.readFields(in);
+ edgeMultimap.put(targetVertexId, edgeValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
new file mode 100644
index 0000000..9df58a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VertexEdges} with long ids and double edge
+ * values, backed by dynamic primitive arrays.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ */
+public class LongDoubleArrayEdges
+ extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
+ implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+ /** Array of target vertex ids. */
+ private LongArrayList neighbors;
+ /** Array of edge values. */
+ private DoubleArrayList edgeValues;
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+ if (edges != null) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the arrays with the expected capacity.
+ if (edges instanceof Collection) {
+ int numEdges =
+ ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size();
+ initialize(numEdges);
+ } else {
+ initialize();
+ }
+ for (Edge<LongWritable, DoubleWritable> edge : edges) {
+ add(edge);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ neighbors = new LongArrayList(capacity);
+ edgeValues = new DoubleArrayList(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ neighbors = new LongArrayList();
+ edgeValues = new DoubleArrayList();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, DoubleWritable> edge) {
+ neighbors.add(edge.getTargetVertexId().get());
+ edgeValues.add(edge.getValue().get());
+ }
+
+ /**
+ * If the backing arrays are more than four times as big as the number of
+ * elements, halve their size.
+ */
+ private void trim() {
+ if (neighbors.elements().length > 4 * neighbors.size()) {
+ neighbors.trim(neighbors.elements().length / 2);
+ edgeValues.trim(neighbors.elements().length / 2);
+ }
+ }
+
+ /**
+ * Remove edge at position i.
+ *
+ * @param i Position of edge to be removed
+ */
+ private void remove(int i) {
+ // The order of the edges is irrelevant, so we can simply replace
+ // the deleted edge with the rightmost element, thus achieving constant
+ // time.
+ if (i == neighbors.size() - 1) {
+ neighbors.popLong();
+ edgeValues.popDouble();
+ } else {
+ neighbors.set(i, neighbors.popLong());
+ edgeValues.set(i, edgeValues.popDouble());
+ }
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ // Thanks to the constant-time implementation of remove(int),
+ // we can remove all matching edges in linear time.
+ for (int i = neighbors.size() - 1; i >= 0; --i) {
+ if (neighbors.get(i) == targetVertexId.get()) {
+ remove(i);
+ }
+ }
+ trim();
+ }
+
+ @Override
+ public int size() {
+ return neighbors.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<LongWritable, DoubleWritable>>() {
+ /** Wrapped neighbors iterator. */
+ private LongIterator neighborsIt = neighbors.iterator();
+ /** Wrapped edge values iterator. */
+ private DoubleIterator edgeValuesIt = edgeValues.iterator();
+ /** Representative edge object. */
+ private Edge<LongWritable, DoubleWritable> representativeEdge =
+ getConf().createEdge();
+
+ @Override
+ public boolean hasNext() {
+ return neighborsIt.hasNext();
+ }
+
+ @Override
+ public Edge<LongWritable, DoubleWritable> next() {
+ representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+ representativeEdge.getValue().set(edgeValuesIt.nextDouble());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(neighbors.size());
+ LongIterator neighborsIt = neighbors.iterator();
+ DoubleIterator edgeValuesIt = edgeValues.iterator();
+ while (neighborsIt.hasNext()) {
+ out.writeLong(neighborsIt.nextLong());
+ out.writeDouble(edgeValuesIt.nextDouble());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ neighbors.add(in.readLong());
+ edgeValues.add(in.readDouble());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
new file mode 100644
index 0000000..6d17b4b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
+import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation with long ids and double edge values,
+ * backed by a {@link Long2DoubleOpenHashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges} (but more than
+ * {@link LongDoubleArrayEdges}.
+ */
+public class LongDoubleHashMapEdges
+ extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
+ implements StrictRandomAccessVertexEdges<LongWritable, DoubleWritable>,
+ ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+ /** Hash map from target vertex id to edge value. */
+ private Long2DoubleOpenHashMap edgeMap;
+ /** Representative edge value object, used by getEdgeValue(). */
+ private DoubleWritable representativeEdgeValue;
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+ if (edges != null) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the hash-map with the expected capacity.
+ if (edges instanceof Collection) {
+ initialize(
+ ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size());
+ } else {
+ initialize();
+ }
+ for (Edge<LongWritable, DoubleWritable> edge : edges) {
+ add(edge);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ edgeMap = new Long2DoubleOpenHashMap(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ edgeMap = new Long2DoubleOpenHashMap();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, DoubleWritable> edge) {
+ edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ edgeMap.remove(targetVertexId.get());
+ }
+
+ @Override
+ public DoubleWritable getEdgeValue(LongWritable targetVertexId) {
+ if (!edgeMap.containsKey(targetVertexId.get())) {
+ return null;
+ }
+ if (representativeEdgeValue == null) {
+ representativeEdgeValue = getConf().createEdgeValue();
+ }
+ representativeEdgeValue.set(edgeMap.get(targetVertexId.get()));
+ return representativeEdgeValue;
+ }
+
+ @Override
+ public int size() {
+ return edgeMap.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<LongWritable, DoubleWritable>>() {
+ /** Wrapped map iterator. */
+ private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
+ edgeMap.long2DoubleEntrySet().fastIterator();
+ /** Representative edge object. */
+ private MutableEdge<LongWritable, DoubleWritable> representativeEdge =
+ getConf().createMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public Edge<LongWritable, DoubleWritable> next() {
+ Long2DoubleMap.Entry nextEntry = mapIterator.next();
+ representativeEdge.getTargetVertexId().set(nextEntry.getLongKey());
+ representativeEdge.getValue().set(nextEntry.getDoubleValue());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(edgeMap.size());
+ for (Long2DoubleMap.Entry entry : edgeMap.long2DoubleEntrySet()) {
+ out.writeLong(entry.getLongKey());
+ out.writeDouble(entry.getDoubleValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ edgeMap.put(in.readLong(), in.readDouble());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
new file mode 100644
index 0000000..a3b869a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VertexEdges} with long ids and null edge
+ * values, backed by a dynamic primitive array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but random access and edge removals are expensive.
+ */
+public class LongNullArrayEdges
+ extends ConfigurableVertexEdges<LongWritable, NullWritable>
+ implements ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+ /** Array of target vertex ids. */
+ private LongArrayList neighbors;
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) {
+ if (edges != null) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the arrays with the expected capacity.
+ if (edges instanceof Collection) {
+ int numEdges =
+ ((Collection<Edge<LongWritable, NullWritable>>) edges).size();
+ initialize(numEdges);
+ } else {
+ initialize();
+ }
+ for (Edge<LongWritable, NullWritable> edge : edges) {
+ add(edge);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ neighbors = new LongArrayList(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ neighbors = new LongArrayList();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, NullWritable> edge) {
+ neighbors.add(edge.getTargetVertexId().get());
+ }
+
+ /**
+ * If the backing array is more than four times as big as the number of
+ * elements, halve its size.
+ */
+ private void trim() {
+ if (neighbors.elements().length > 4 * neighbors.size()) {
+ neighbors.trim(neighbors.elements().length / 2);
+ }
+ }
+
+ /**
+ * Remove edge at position i.
+ *
+ * @param i Position of edge to be removed
+ */
+ private void remove(int i) {
+ // The order of the edges is irrelevant, so we can simply replace
+ // the deleted edge with the rightmost element, thus achieving constant
+ // time.
+ if (i == neighbors.size() - 1) {
+ neighbors.popLong();
+ } else {
+ neighbors.set(i, neighbors.popLong());
+ }
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ // Thanks to the constant-time implementation of remove(int),
+ // we can remove all matching edges in linear time.
+ for (int i = neighbors.size() - 1; i >= 0; --i) {
+ if (neighbors.get(i) == targetVertexId.get()) {
+ remove(i);
+ }
+ }
+ trim();
+ }
+
+ @Override
+ public int size() {
+ return neighbors.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+ /** Wrapped neighbors iterator. */
+ private LongIterator neighborsIt = neighbors.iterator();
+ /** Representative edge object. */
+ private Edge<LongWritable, NullWritable> representativeEdge =
+ getConf().createEdge();
+
+ @Override
+ public boolean hasNext() {
+ return neighborsIt.hasNext();
+ }
+
+ @Override
+ public Edge<LongWritable, NullWritable> next() {
+ representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(neighbors.size());
+ LongIterator neighborsIt = neighbors.iterator();
+ while (neighborsIt.hasNext()) {
+ out.writeLong(neighborsIt.nextLong());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ neighbors.add(in.readLong());
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
new file mode 100644
index 0000000..70e69c4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation with long ids and null edge values,
+ * backed by a {@link LongOpenHashSet}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges} (but more than
+ * {@link LongNullArrayEdges}.
+ */
+public class LongNullHashSetEdges
+ extends ConfigurableVertexEdges<LongWritable, NullWritable>
+ implements StrictRandomAccessVertexEdges<LongWritable, NullWritable>,
+ ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+ /** Hash set of target vertex ids. */
+ private LongOpenHashSet neighbors;
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) {
+ if (edges != null) {
+ // If the iterable is actually a collection, we can cheaply get the
+ // size and initialize the hash-map with the expected capacity.
+ if (edges instanceof Collection) {
+ initialize(
+ ((Collection<Edge<LongWritable, NullWritable>>) edges).size());
+ } else {
+ initialize();
+ }
+ for (Edge<LongWritable, NullWritable> edge : edges) {
+ add(edge);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ neighbors = new LongOpenHashSet(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ neighbors = new LongOpenHashSet();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, NullWritable> edge) {
+ neighbors.add(edge.getTargetVertexId().get());
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ neighbors.remove(targetVertexId.get());
+ }
+
+ @Override
+ public NullWritable getEdgeValue(LongWritable targetVertexId) {
+ return NullWritable.get();
+ }
+
+ @Override
+ public int size() {
+ return neighbors.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+ /** Wrapped neighbors iterator. */
+ private LongIterator neighborsIt = neighbors.iterator();
+ /** Representative edge object. */
+ private MutableEdge<LongWritable, NullWritable> representativeEdge =
+ getConf().createMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return neighborsIt.hasNext();
+ }
+
+ @Override
+ public Edge<LongWritable, NullWritable> next() {
+ representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(neighbors.size());
+ LongIterator neighborsIt = neighbors.iterator();
+ while (neighborsIt.hasNext()) {
+ out.writeLong(neighborsIt.nextLong());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ neighbors.add(in.readLong());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
new file mode 100644
index 0000000..9f8658e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for {@link VertexEdges} implementations that provide efficient
+ * random access to the edges given the target vertex id.
+ * This version is for multigraphs (i.e. there can be parallel edges).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface MultiRandomAccessVertexEdges<I extends WritableComparable,
+ E extends Writable> extends VertexEdges<I, E> {
+ /**
+ * Return an iterable over the edge values for a given target vertex id.
+ *
+ * @param targetVertexId Target vertex id
+ * @return Iterable of edge values
+ */
+ Iterable<E> getAllEdgeValues(I targetVertexId);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
new file mode 100644
index 0000000..bf00b4f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map. This edge can be mutated,
+ * that is you can set it's target vertex ID and edge value.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface MutableEdge<I extends WritableComparable, E extends Writable>
+ extends Edge<I, E> {
+ /**
+ * Set the destination vertex index of this edge.
+ *
+ * @param targetVertexId new destination vertex
+ */
+ void setTargetVertexId(I targetVertexId);
+
+ /**
+ * Set the value for this edge.
+ *
+ * @param value new edge value
+ */
+ void setValue(E value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
new file mode 100644
index 0000000..7704baf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Empty interface to characterize {@link VertexEdges} implementations that
+ * don't keep references to the Edge (or id and value) objects they are passed.
+ * The Giraph infrastructure can exploit this characteristic by reusing Edge
+ * objects.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface ReuseObjectsVertexEdges<I extends WritableComparable,
+ E extends Writable> extends VertexEdges<I, E> { }