You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/01 22:32:24 UTC
git commit: updated refs/heads/trunk to 9cedc7d
Repository: giraph
Updated Branches:
refs/heads/trunk f0b6cddd3 -> 9cedc7d76
GIRAPH-873 : Specialized edge stores
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9cedc7d7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9cedc7d7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9cedc7d7
Branch: refs/heads/trunk
Commit: 9cedc7d76f2bbe52b3d1cc4caf8024e730266f83
Parents: f0b6cdd
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 1 13:29:38 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 1 13:31:42 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/comm/ServerData.java | 5 +-
.../org/apache/giraph/conf/GiraphConstants.java | 9 +
.../ImmutableClassesGiraphConfiguration.java | 12 +
.../apache/giraph/edge/AbstractEdgeStore.java | 276 +++++++++++++++++++
.../java/org/apache/giraph/edge/EdgeStore.java | 205 +-------------
.../apache/giraph/edge/EdgeStoreFactory.java | 54 ++++
.../giraph/edge/InMemoryEdgeStoreFactory.java | 79 ++++++
.../org/apache/giraph/edge/SimpleEdgeStore.java | 123 +++++++++
.../giraph/edge/primitives/IntEdgeStore.java | 133 +++++++++
.../giraph/edge/primitives/LongEdgeStore.java | 134 +++++++++
.../giraph/edge/primitives/package-info.java | 21 ++
12 files changed, 851 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d1663c4..36af911 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-873: Specialized edge stores
+
GIRAPH-898: Remove giraph-accumulo from Facebook profile (edunov via majakabiljo)
GIRAPH-896: Fix memory leak in SuperstepMetricsRegistry (edunov via pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 5a217d4..f0ecca2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.partition.DiskBackedPartitionStore;
import org.apache.giraph.partition.PartitionStore;
@@ -108,7 +109,9 @@ public class ServerData<I extends WritableComparable,
partitionStore =
new SimplePartitionStore<I, V, E>(conf, context);
}
- edgeStore = new EdgeStore<I, V, E>(service, conf, context);
+ EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
+ edgeStoreFactory.initialize(service, conf, context);
+ edgeStore = edgeStoreFactory.newStore();
ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
allAggregatorData = new AllAggregatorServerData(context, conf);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7f1317f..6b36418 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -23,6 +23,8 @@ import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.EdgeStoreFactory;
+import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.ComputationFactory;
import org.apache.giraph.factories.DefaultComputationFactory;
@@ -94,6 +96,13 @@ public interface GiraphConstants {
TypesHolder.class,
"TypesHolder, used if Computation not set - optional");
+ /** Edge Store Factory */
+ ClassConfOption<EdgeStoreFactory> EDGE_STORE_FACTORY_CLASS =
+ ClassConfOption.create("giraph.edgeStoreFactoryClass",
+ InMemoryEdgeStoreFactory.class,
+ EdgeStoreFactory.class,
+ "Edge Store Factory class to use for creating edgeStore");
+
/** Message Store Factory */
ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS =
ClassConfOption.create("giraph.messageStoreFactoryClass",
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2e8c935..95e029d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReusableEdge;
import org.apache.giraph.factories.ComputationFactory;
@@ -768,6 +769,17 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create edge store factory
+ *
+ * @return edge store factory
+ */
+ public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
+ Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
+ EDGE_STORE_FACTORY_CLASS.get(this);
+ return ReflectionUtils.newInstance(edgeStoreFactoryClass);
+ }
+
+ /**
* Get the user's subclassed incoming message value class.
*
* @param <M> Message data
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
new file mode 100644
index 0000000..80e909d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Basic implementation of edges store, extended this to easily define simple
+ * and primitive edge stores
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <K> Key corresponding to Vertex id
+ * @param <Et> Entry type
+ */
+public abstract class AbstractEdgeStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, K, Et>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements EdgeStore<I, V, E> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
+ /** Service worker. */
+ protected CentralizedServiceWorker<I, V, E> service;
+ /** Giraph configuration. */
+ protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+ /** Progressable to report progress. */
+ protected Progressable progressable;
+ /** Map used to temporarily store incoming edges. */
+ protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
+ /**
+ * Whether the chosen {@link OutEdges} implementation allows for Edge
+ * reuse.
+ */
+ protected boolean reuseEdgeObjects;
+ /**
+ * Whether the {@link OutEdges} class used during input is different
+ * from the one used during computation.
+ */
+ protected boolean useInputOutEdges;
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public AbstractEdgeStore(
+ CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ Progressable progressable) {
+ this.service = service;
+ this.configuration = configuration;
+ this.progressable = progressable;
+ transientEdges = new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ reuseEdgeObjects = configuration.reuseEdgeObjects();
+ useInputOutEdges = configuration.useInputOutEdges();
+ }
+
+ /**
+ * Get vertexId for a given key
+ *
+ * @param entry for vertexId key
+ * @param representativeVertexId representativeVertexId
+ * @return vertex Id
+ */
+ protected abstract I getVertexId(Et entry, I representativeVertexId);
+
+ /**
+ * Create vertexId from a given key
+ *
+ * @param entry for vertexId key
+ * @return new vertexId
+ */
+ protected abstract I createVertexId(Et entry);
+
+ /**
+ * Get OutEdges for a given partition
+ *
+ * @param partitionId id of partition
+ * @return OutEdges for the partition
+ */
+ protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
+
+ /**
+ * Remove and return the OutEdges for a given partition
+ *
+ * @param entry for vertexId key
+ * @param partitionEdges map of out-edges for vertices in a partition
+ * @return out edges
+ */
+ protected abstract OutEdges<I, E> removePartitionEdges(Et entry,
+ Map<K, OutEdges<I, E>> partitionEdges);
+
+ /**
+ * Get iterator for partition edges
+ *
+ * @param partitionEdges map of out-edges for vertices in a partition
+ * @return iterator
+ */
+ protected abstract Iterator<Et>
+ getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
+
+ /**
+ * Get out-edges for a given vertex
+ *
+ * @param vertexIdEdgeIterator vertex Id Edge iterator
+ * @param partitionEdgesIn map of out-edges for vertices in a partition
+ * @return out-edges for the vertex
+ */
+ protected abstract OutEdges<I, E> getVertexOutEdges(
+ ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+ Map<K, OutEdges<I, E>> partitionEdgesIn);
+
+ @Override
+ public void addPartitionEdges(
+ int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+ Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
+
+ ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+ edges.getVertexIdEdgeIterator();
+ while (vertexIdEdgeIterator.hasNext()) {
+ vertexIdEdgeIterator.next();
+ Edge<I, E> edge = reuseEdgeObjects ?
+ vertexIdEdgeIterator.getCurrentEdge() :
+ vertexIdEdgeIterator.releaseCurrentEdge();
+ OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
+ partitionEdges);
+ synchronized (outEdges) {
+ outEdges.add(edge);
+ }
+ }
+ }
+
+ /**
+ * Convert the input edges to the {@link OutEdges} data structure used
+ * for computation (if different).
+ *
+ * @param inputEdges Input edges
+ * @return Compute edges
+ */
+ private OutEdges<I, E> convertInputToComputeEdges(
+ OutEdges<I, E> inputEdges) {
+ if (!useInputOutEdges) {
+ return inputEdges;
+ } else {
+ return configuration.createAndInitializeOutEdges(inputEdges);
+ }
+ }
+
+ @Override
+ public void moveEdgesToVertices() {
+ final boolean createSourceVertex = configuration.getCreateSourceVertex();
+ if (transientEdges.isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: No edges to move");
+ }
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+ }
+
+ final BlockingQueue<Integer> partitionIdQueue =
+ new ArrayBlockingQueue<>(transientEdges.size());
+ partitionIdQueue.addAll(transientEdges.keySet());
+ int numThreads = configuration.getNumInputSplitsThreads();
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Integer partitionId;
+ I representativeVertexId = configuration.createVertexId();
+ while ((partitionId = partitionIdQueue.poll()) != null) {
+ Partition<I, V, E> partition =
+ service.getPartitionStore().getOrCreatePartition(partitionId);
+ Map<K, OutEdges<I, E>> partitionEdges =
+ transientEdges.remove(partitionId);
+ Iterator<Et> iterator =
+ getPartitionEdgesIterator(partitionEdges);
+ // process all vertices in given partition
+ while (iterator.hasNext()) {
+ Et entry = iterator.next();
+ I vertexId = getVertexId(entry,
+ representativeVertexId);
+ OutEdges<I, E> outEdges = convertInputToComputeEdges(
+ removePartitionEdges(entry, partitionEdges));
+ Vertex<I, V, E> vertex = partition.getVertex(vertexId);
+ // If the source vertex doesn't exist, create it. Otherwise,
+ // just set the edges.
+ if (vertex == null) {
+ if (createSourceVertex) {
+ // createVertex only if it is allowed by configuration
+ vertex = configuration.createVertex();
+ vertex.initialize(createVertexId(entry),
+ configuration.createVertexValue(), outEdges);
+ partition.putVertex(vertex);
+ }
+ } else {
+ // A vertex may exist with or without edges initially
+ // and optimize the case of no initial edges
+ if (vertex.getNumEdges() == 0) {
+ vertex.setEdges(outEdges);
+ } else {
+ for (Edge<I, E> edge : outEdges) {
+ vertex.addEdge(edge);
+ }
+ }
+ // Some Partition implementations (e.g. ByteArrayPartition)
+ // require us to put back the vertex after modifying it.
+ partition.saveVertex(vertex);
+ }
+ }
+ // Some PartitionStore implementations
+ // (e.g. DiskBackedPartitionStore) require us to put back the
+ // partition after modifying it.
+ service.getPartitionStore().putPartition(partition);
+ }
+ return null;
+ }
+ };
+ }
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "move-edges-%d", progressable);
+
+ // remove all entries
+ transientEdges.clear();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+ "vertices.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 57ad387..1150eaf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -18,25 +18,9 @@
package org.apache.giraph.edge;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.Trimmable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.MapMaker;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
/**
* Collects incoming edges for vertices owned by this worker.
@@ -45,50 +29,8 @@ import java.util.concurrent.ConcurrentMap;
* @param <V> Vertex value
* @param <E> Edge value
*/
-public class EdgeStore<I extends WritableComparable,
- V extends Writable, E extends Writable> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(EdgeStore.class);
- /** Service worker. */
- private CentralizedServiceWorker<I, V, E> service;
- /** Giraph configuration. */
- private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
- /** Progressable to report progress. */
- private Progressable progressable;
- /** Map used to temporarily store incoming edges. */
- private ConcurrentMap<Integer,
- ConcurrentMap<I, OutEdges<I, E>>> transientEdges;
- /**
- * Whether the chosen {@link OutEdges} implementation allows for Edge
- * reuse.
- */
- private boolean reuseEdgeObjects;
- /**
- * Whether the {@link OutEdges} class used during input is different
- * from the one used during computation.
- */
- private boolean useInputOutEdges;
-
- /**
- * Constructor.
- *
- * @param service Service worker
- * @param configuration Configuration
- * @param progressable Progressable
- */
- public EdgeStore(
- CentralizedServiceWorker<I, V, E> service,
- ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- Progressable progressable) {
- this.service = service;
- this.configuration = configuration;
- this.progressable = progressable;
- transientEdges = new MapMaker().concurrencyLevel(
- configuration.getNettyServerExecutionConcurrency()).makeMap();
- reuseEdgeObjects = configuration.reuseEdgeObjects();
- useInputOutEdges = configuration.useInputOutEdges();
- }
-
+public interface EdgeStore<I extends WritableComparable,
+ V extends Writable, E extends Writable> {
/**
* Add edges belonging to a given partition on this worker.
* Note: This method is thread-safe.
@@ -96,150 +38,11 @@ public class EdgeStore<I extends WritableComparable,
* @param partitionId Partition id for the incoming edges.
* @param edges Incoming edges
*/
- public void addPartitionEdges(
- int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
- ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
- transientEdges.get(partitionId);
- if (partitionEdges == null) {
- ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
- new MapMaker().concurrencyLevel(
- configuration.getNettyServerExecutionConcurrency()).makeMap();
- partitionEdges = transientEdges.putIfAbsent(partitionId,
- newPartitionEdges);
- if (partitionEdges == null) {
- partitionEdges = newPartitionEdges;
- }
- }
- ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
- edges.getVertexIdEdgeIterator();
- while (vertexIdEdgeIterator.hasNext()) {
- vertexIdEdgeIterator.next();
- I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
- Edge<I, E> edge = reuseEdgeObjects ?
- vertexIdEdgeIterator.getCurrentEdge() :
- vertexIdEdgeIterator.releaseCurrentEdge();
- OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
- if (outEdges == null) {
- OutEdges<I, E> newOutEdges =
- configuration.createAndInitializeInputOutEdges();
- outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
- if (outEdges == null) {
- outEdges = newOutEdges;
- // Since we had to use the vertex id as a new key in the map,
- // we need to release the object.
- vertexIdEdgeIterator.releaseCurrentVertexId();
- }
- }
- synchronized (outEdges) {
- outEdges.add(edge);
- }
- }
- }
-
- /**
- * Convert the input edges to the {@link OutEdges} data structure used
- * for computation (if different).
- *
- * @param inputEdges Input edges
- * @return Compute edges
- */
- private OutEdges<I, E> convertInputToComputeEdges(
- OutEdges<I, E> inputEdges) {
- if (!useInputOutEdges) {
- return inputEdges;
- } else {
- return configuration.createAndInitializeOutEdges(inputEdges);
- }
- }
+ void addPartitionEdges(int partitionId, ByteArrayVertexIdEdges<I, E> edges);
/**
* Move all edges from temporary storage to their source vertices.
* Note: this method is not thread-safe.
*/
- public void moveEdgesToVertices() {
- final boolean createSourceVertex = configuration.
- getCreateSourceVertex();
- if (transientEdges.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: No edges to move");
- }
- return;
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
- }
-
- final BlockingQueue<Integer> partitionIdQueue =
- new ArrayBlockingQueue<Integer>(transientEdges.size());
- partitionIdQueue.addAll(transientEdges.keySet());
- int numThreads = configuration.getNumInputSplitsThreads();
-
- CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
- @Override
- public Callable<Void> newCallable(int callableId) {
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Integer partitionId;
- while ((partitionId = partitionIdQueue.poll()) != null) {
- Partition<I, V, E> partition =
- service.getPartitionStore().getOrCreatePartition(partitionId);
- ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
- transientEdges.remove(partitionId);
- for (I vertexId : partitionEdges.keySet()) {
- OutEdges<I, E> outEdges = convertInputToComputeEdges(
- partitionEdges.remove(vertexId));
- Vertex<I, V, E> vertex = partition.getVertex(vertexId);
- // If the source vertex doesn't exist, create it. Otherwise,
- // just set the edges.
- if (vertex == null) {
- if (createSourceVertex) {
- // createVertex only if it is allowed by configuration
- vertex = configuration.createVertex();
- vertex.initialize(vertexId,
- configuration.createVertexValue(), outEdges);
- if (vertex instanceof Trimmable) {
- ((Trimmable) vertex).trim();
- }
- partition.putVertex(vertex);
- }
- } else {
- // A vertex may exist with or without edges initially
- // and optimize the case of no initial edges
- if (vertex.getNumEdges() == 0) {
- vertex.setEdges(outEdges);
- } else {
- for (Edge<I, E> edge : outEdges) {
- vertex.addEdge(edge);
- }
- }
- if (vertex instanceof Trimmable) {
- ((Trimmable) vertex).trim();
- }
- // Some Partition implementations (e.g. ByteArrayPartition)
- // require us to put back the vertex after modifying it.
- partition.saveVertex(vertex);
- }
- }
- // Some PartitionStore implementations
- // (e.g. DiskBackedPartitionStore) require us to put back the
- // partition after modifying it.
- service.getPartitionStore().putPartition(partition);
- }
- return null;
- }
- };
- }
- };
- ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
- "move-edges-%d", progressable);
-
- transientEdges.clear();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
- "vertices.");
- }
- }
+ void moveEdgesToVertices();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
new file mode 100644
index 0000000..cb47fd0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Factory to create a new Edge Store
+ * @param <I> vertex id
+ * @param <V> vertex value
+ * @param <E> edge value
+ */
+public interface EdgeStoreFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable> {
+
+ /**
+ * Creates new edge store.
+ *
+ * @return edge store
+ */
+ EdgeStore<I, V, E> newStore();
+
+ /**
+ * Implementation class should use this method of initialization
+ * of any required internal state.
+ *
+ * @param service Service to get partition mappings
+ * @param conf Configuration
+ * @param progressable Progressable
+ */
+ void initialize(CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ Progressable progressable);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
new file mode 100644
index 0000000..d3d6997
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.primitives.IntEdgeStore;
+import org.apache.giraph.edge.primitives.LongEdgeStore;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Edge store factory which produces message stores which hold all
+ * edges in memory. It creates primitive edges stores when vertex id is
+ * IntWritable or LongWritable
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("unchecked")
+public class InMemoryEdgeStoreFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements EdgeStoreFactory<I, V, E> {
+ /** Service worker. */
+ protected CentralizedServiceWorker<I, V, E> service;
+ /** Giraph configuration. */
+ protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
+ /** Progressable to report progress. */
+ protected Progressable progressable;
+
+ @Override
+ public EdgeStore<I, V, E> newStore() {
+ Class<I> vertexIdClass = conf.getVertexIdClass();
+ EdgeStore<I, V, E> edgeStore;
+ if (vertexIdClass.equals(IntWritable.class)) {
+ edgeStore = (EdgeStore<I, V, E>) new IntEdgeStore<>(
+ (CentralizedServiceWorker<IntWritable, V, E>) service,
+ (ImmutableClassesGiraphConfiguration<IntWritable, V, E>) conf,
+ progressable);
+ } else if (vertexIdClass.equals(LongWritable.class)) {
+ edgeStore = (EdgeStore<I, V, E>) new LongEdgeStore<>(
+ (CentralizedServiceWorker<LongWritable, V, E>) service,
+ (ImmutableClassesGiraphConfiguration<LongWritable, V, E>) conf,
+ progressable);
+ } else {
+ edgeStore = new SimpleEdgeStore<>(service, conf, progressable);
+ }
+ return edgeStore;
+ }
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ Progressable progressable) {
+ this.service = service;
+ this.conf = conf;
+ this.progressable = progressable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
new file mode 100644
index 0000000..6e2a74f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.MapMaker;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Simple in memory edge store which supports any type of ids.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class SimpleEdgeStore<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends AbstractEdgeStore<I, V, E, I,
+ Map.Entry<I, OutEdges<I, E>>> {
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public SimpleEdgeStore(
+ CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ Progressable progressable) {
+ super(service, configuration, progressable);
+ }
+
+ @Override
+ protected I getVertexId(Map.Entry<I, OutEdges<I, E>> entry,
+ I representativeVertexId) {
+ return entry.getKey();
+ }
+
+ @Override
+ protected I createVertexId(Map.Entry<I, OutEdges<I, E>> entry) {
+ return entry.getKey();
+ }
+
+ @Override
+ protected ConcurrentMap<I, OutEdges<I, E>> getPartitionEdges(
+ int partitionId) {
+ ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
+ (ConcurrentMap<I, OutEdges<I, E>>) transientEdges.get(partitionId);
+ if (partitionEdges == null) {
+ ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
+ new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ partitionEdges = (ConcurrentMap<I, OutEdges<I, E>>)
+ transientEdges.putIfAbsent(partitionId, newPartitionEdges);
+ if (partitionEdges == null) {
+ partitionEdges = newPartitionEdges;
+ }
+ }
+ return partitionEdges;
+ }
+
+ @Override
+ protected OutEdges<I, E> removePartitionEdges(
+ Map.Entry<I, OutEdges<I, E>> entry,
+ Map<I, OutEdges<I, E>> partitionEdges) {
+ return partitionEdges.put(entry.getKey(), null);
+ }
+
+ @Override
+ protected Iterator<Map.Entry<I, OutEdges<I, E>>>
+ getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) {
+ return partitionEdges.entrySet().iterator();
+ }
+
+ @Override
+ protected OutEdges<I, E> getVertexOutEdges(
+ ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+ Map<I, OutEdges<I, E>> partitionEdgesIn) {
+ ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
+ (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;
+ I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+ OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
+ if (outEdges == null) {
+ OutEdges<I, E> newOutEdges =
+ configuration.createAndInitializeInputOutEdges();
+ outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
+ if (outEdges == null) {
+ outEdges = newOutEdges;
+ // Since we had to use the vertex id as a new key in the map,
+ // we need to release the object.
+ vertexIdEdgeIterator.releaseCurrentVertexId();
+ }
+ }
+ return outEdges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
new file mode 100644
index 0000000..c6b5051
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Special edge store to be used when ids are IntWritable.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class IntEdgeStore<V extends Writable, E extends Writable>
+ extends AbstractEdgeStore<IntWritable, V, E, Integer,
+ Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> {
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public IntEdgeStore(
+ CentralizedServiceWorker<IntWritable, V, E> service,
+ ImmutableClassesGiraphConfiguration<IntWritable, V, E> configuration,
+ Progressable progressable) {
+ super(service, configuration, progressable);
+ }
+
+ @Override
+ protected IntWritable getVertexId(
+ Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry,
+ IntWritable representativeVertexId) {
+ representativeVertexId.set(entry.getIntKey());
+ return representativeVertexId;
+ }
+
+ @Override
+ protected IntWritable createVertexId(
+ Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry) {
+ return new IntWritable(entry.getIntKey());
+ }
+
+ @Override
+ protected OutEdges<IntWritable, E> removePartitionEdges(
+ Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry,
+ Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {
+ return partitionEdges.put(entry.getIntKey(), null);
+ }
+
+ @Override
+ protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>>
+ getPartitionEdgesIterator(
+ Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {
+ return ((Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdges)
+ .int2ObjectEntrySet()
+ .iterator();
+ }
+
+ @Override
+ protected Int2ObjectMap<OutEdges<IntWritable, E>> getPartitionEdges(
+ int partitionId) {
+ Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
+ (Int2ObjectMap<OutEdges<IntWritable, E>>)
+ transientEdges.get(partitionId);
+ if (partitionEdges == null) {
+ Int2ObjectMap<OutEdges<IntWritable, E>> newPartitionEdges =
+ Int2ObjectMaps.synchronize(
+ new Int2ObjectOpenHashMap<OutEdges<IntWritable, E>>());
+ partitionEdges = (Int2ObjectMap<OutEdges<IntWritable, E>>)
+ transientEdges.putIfAbsent(partitionId,
+ newPartitionEdges);
+ if (partitionEdges == null) {
+ partitionEdges = newPartitionEdges;
+ }
+ }
+ return partitionEdges;
+ }
+
+ @Override
+ protected OutEdges<IntWritable, E> getVertexOutEdges(
+ ByteArrayVertexIdEdges<IntWritable, E>.VertexIdEdgeIterator
+ vertexIdEdgeIterator,
+ Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) {
+ Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
+ (Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn;
+ IntWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+ OutEdges<IntWritable, E> outEdges = partitionEdges.get(vertexId.get());
+ if (outEdges == null) {
+ synchronized (partitionEdges) {
+ outEdges = partitionEdges.get(vertexId.get());
+ if (outEdges == null) {
+ outEdges = configuration.createAndInitializeInputOutEdges();
+ partitionEdges.put(vertexId.get(), outEdges);
+ }
+ }
+ }
+ return outEdges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
new file mode 100644
index 0000000..d4c44c7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Special edge store to be used when ids are LongWritable.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class LongEdgeStore<V extends Writable, E extends Writable>
+ extends AbstractEdgeStore<LongWritable, V, E, Long,
+ Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> {
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public LongEdgeStore(
+ CentralizedServiceWorker<LongWritable, V, E> service,
+ ImmutableClassesGiraphConfiguration<LongWritable, V, E> configuration,
+ Progressable progressable) {
+ super(service, configuration, progressable);
+ }
+
+ @Override
+ protected LongWritable getVertexId(
+ Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry,
+ LongWritable representativeVertexId) {
+ representativeVertexId.set(entry.getLongKey());
+ return representativeVertexId;
+ }
+
+ @Override
+ protected LongWritable createVertexId(
+ Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry) {
+ return new LongWritable(entry.getLongKey());
+ }
+
+
+ @Override
+ protected OutEdges<LongWritable, E> removePartitionEdges(
+ Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry,
+ Map<Long, OutEdges<LongWritable, E>> partitionEdges) {
+ return partitionEdges.put(entry.getLongKey(), null);
+ }
+
+ @Override
+ protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>>
+ getPartitionEdgesIterator(
+ Map<Long, OutEdges<LongWritable, E>> partitionEdges) {
+ return ((Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdges)
+ .long2ObjectEntrySet()
+ .iterator();
+ }
+
+ @Override
+ protected Long2ObjectMap<OutEdges<LongWritable, E>> getPartitionEdges(
+ int partitionId) {
+ Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
+ (Long2ObjectMap<OutEdges<LongWritable, E>>)
+ transientEdges.get(partitionId);
+ if (partitionEdges == null) {
+ Long2ObjectMap<OutEdges<LongWritable, E>> newPartitionEdges =
+ Long2ObjectMaps.synchronize(
+ new Long2ObjectOpenHashMap<OutEdges<LongWritable, E>>());
+ partitionEdges = (Long2ObjectMap<OutEdges<LongWritable, E>>)
+ transientEdges.putIfAbsent(partitionId,
+ newPartitionEdges);
+ if (partitionEdges == null) {
+ partitionEdges = newPartitionEdges;
+ }
+ }
+ return partitionEdges;
+ }
+
+ @Override
+ protected OutEdges<LongWritable, E> getVertexOutEdges(
+ ByteArrayVertexIdEdges<LongWritable, E>.VertexIdEdgeIterator
+ vertexIdEdgeIterator,
+ Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) {
+ Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
+ (Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn;
+ LongWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+ OutEdges<LongWritable, E> outEdges = partitionEdges.get(vertexId.get());
+ if (outEdges == null) {
+ synchronized (partitionEdges) {
+ outEdges = partitionEdges.get(vertexId.get());
+ if (outEdges == null) {
+ outEdges = configuration.createAndInitializeInputOutEdges();
+ partitionEdges.put(vertexId.get(), outEdges);
+ }
+ }
+ }
+ return outEdges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
new file mode 100644
index 0000000..81c5b6c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of edge stores specialized for certain type of vertex ids.
+ */
+package org.apache.giraph.edge.primitives;