You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/01 22:32:24 UTC

git commit: updated refs/heads/trunk to 9cedc7d

Repository: giraph
Updated Branches:
  refs/heads/trunk f0b6cddd3 -> 9cedc7d76


GIRAPH-873 : Specialized edge stores


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9cedc7d7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9cedc7d7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9cedc7d7

Branch: refs/heads/trunk
Commit: 9cedc7d76f2bbe52b3d1cc4caf8024e730266f83
Parents: f0b6cdd
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 1 13:29:38 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 1 13:31:42 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/comm/ServerData.java |   5 +-
 .../org/apache/giraph/conf/GiraphConstants.java |   9 +
 .../ImmutableClassesGiraphConfiguration.java    |  12 +
 .../apache/giraph/edge/AbstractEdgeStore.java   | 276 +++++++++++++++++++
 .../java/org/apache/giraph/edge/EdgeStore.java  | 205 +-------------
 .../apache/giraph/edge/EdgeStoreFactory.java    |  54 ++++
 .../giraph/edge/InMemoryEdgeStoreFactory.java   |  79 ++++++
 .../org/apache/giraph/edge/SimpleEdgeStore.java | 123 +++++++++
 .../giraph/edge/primitives/IntEdgeStore.java    | 133 +++++++++
 .../giraph/edge/primitives/LongEdgeStore.java   | 134 +++++++++
 .../giraph/edge/primitives/package-info.java    |  21 ++
 12 files changed, 851 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d1663c4..36af911 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-873: Specialized edge stores 
+ 
   GIRAPH-898: Remove giraph-accumulo from Facebook profile (edunov via majakabiljo)
 
   GIRAPH-896: Fix memory leak in SuperstepMetricsRegistry (edunov via pavanka)  

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 5a217d4..f0ecca2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.partition.DiskBackedPartitionStore;
 import org.apache.giraph.partition.PartitionStore;
@@ -108,7 +109,9 @@ public class ServerData<I extends WritableComparable,
       partitionStore =
           new SimplePartitionStore<I, V, E>(conf, context);
     }
-    edgeStore = new EdgeStore<I, V, E>(service, conf, context);
+    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
+    edgeStoreFactory.initialize(service, conf, context);
+    edgeStore = edgeStoreFactory.newStore();
     ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
     allAggregatorData = new AllAggregatorServerData(context, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7f1317f..6b36418 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -23,6 +23,8 @@ import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.EdgeStoreFactory;
+import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
@@ -94,6 +96,13 @@ public interface GiraphConstants {
           TypesHolder.class,
           "TypesHolder, used if Computation not set - optional");
 
+  /** Edge Store Factory */
+  ClassConfOption<EdgeStoreFactory> EDGE_STORE_FACTORY_CLASS =
+      ClassConfOption.create("giraph.edgeStoreFactoryClass",
+          InMemoryEdgeStoreFactory.class,
+          EdgeStoreFactory.class,
+          "Edge Store Factory class to use for creating edgeStore");
+
   /** Message Store Factory */
   ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS =
       ClassConfOption.create("giraph.messageStoreFactoryClass",

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2e8c935..95e029d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReusableEdge;
 import org.apache.giraph.factories.ComputationFactory;
@@ -768,6 +769,17 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create edge store factory
+   *
+   * @return edge store factory
+   */
+  public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
+    Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
+        EDGE_STORE_FACTORY_CLASS.get(this);
+    return ReflectionUtils.newInstance(edgeStoreFactoryClass);
+  }
+
+  /**
    * Get the user's subclassed incoming message value class.
    *
    * @param <M> Message data

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
new file mode 100644
index 0000000..80e909d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Basic implementation of edges store, extended this to easily define simple
+ * and primitive edge stores
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <K> Key corresponding to Vertex id
+ * @param <Et> Entry type
+ */
+public abstract class AbstractEdgeStore<I extends WritableComparable,
+  V extends Writable, E extends Writable, K, Et>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements EdgeStore<I, V, E> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
+  /** Service worker. */
+  protected CentralizedServiceWorker<I, V, E> service;
+  /** Giraph configuration. */
+  protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+  /** Progressable to report progress. */
+  protected Progressable progressable;
+  /** Map used to temporarily store incoming edges. */
+  protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
+  /**
+   * Whether the chosen {@link OutEdges} implementation allows for Edge
+   * reuse.
+   */
+  protected boolean reuseEdgeObjects;
+  /**
+   * Whether the {@link OutEdges} class used during input is different
+   * from the one used during computation.
+   */
+  protected boolean useInputOutEdges;
+
+  /**
+   * Constructor.
+   *
+   * @param service Service worker
+   * @param configuration Configuration
+   * @param progressable Progressable
+   */
+  public AbstractEdgeStore(
+    CentralizedServiceWorker<I, V, E> service,
+    ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+    Progressable progressable) {
+    this.service = service;
+    this.configuration = configuration;
+    this.progressable = progressable;
+    transientEdges = new MapMaker().concurrencyLevel(
+      configuration.getNettyServerExecutionConcurrency()).makeMap();
+    reuseEdgeObjects = configuration.reuseEdgeObjects();
+    useInputOutEdges = configuration.useInputOutEdges();
+  }
+
+  /**
+   * Get vertexId for a given key
+   *
+   * @param entry for vertexId key
+   * @param representativeVertexId representativeVertexId
+   * @return vertex Id
+   */
+  protected abstract I getVertexId(Et entry, I representativeVertexId);
+
+  /**
+   * Create vertexId from a given key
+   *
+   * @param entry for vertexId key
+   * @return new vertexId
+   */
+  protected abstract I createVertexId(Et entry);
+
+  /**
+   * Get OutEdges for a given partition
+   *
+   * @param partitionId id of partition
+   * @return OutEdges for the partition
+   */
+  protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
+
+  /**
+   * Remove and return the OutEdges for a given partition
+   *
+   * @param entry for vertexId key
+   * @param partitionEdges map of out-edges for vertices in a partition
+   * @return out edges
+   */
+  protected abstract OutEdges<I, E> removePartitionEdges(Et entry,
+    Map<K, OutEdges<I, E>> partitionEdges);
+
+  /**
+   * Get iterator for partition edges
+   *
+   * @param partitionEdges map of out-edges for vertices in a partition
+   * @return iterator
+   */
+  protected abstract Iterator<Et>
+  getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
+
+  /**
+   * Get out-edges for a given vertex
+   *
+   * @param vertexIdEdgeIterator vertex Id Edge iterator
+   * @param partitionEdgesIn map of out-edges for vertices in a partition
+   * @return out-edges for the vertex
+   */
+  protected abstract OutEdges<I, E> getVertexOutEdges(
+    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+    Map<K, OutEdges<I, E>> partitionEdgesIn);
+
+  @Override
+  public void addPartitionEdges(
+    int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+    Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
+
+    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+        edges.getVertexIdEdgeIterator();
+    while (vertexIdEdgeIterator.hasNext()) {
+      vertexIdEdgeIterator.next();
+      Edge<I, E> edge = reuseEdgeObjects ?
+          vertexIdEdgeIterator.getCurrentEdge() :
+          vertexIdEdgeIterator.releaseCurrentEdge();
+      OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
+          partitionEdges);
+      synchronized (outEdges) {
+        outEdges.add(edge);
+      }
+    }
+  }
+
+  /**
+   * Convert the input edges to the {@link OutEdges} data structure used
+   * for computation (if different).
+   *
+   * @param inputEdges Input edges
+   * @return Compute edges
+   */
+  private OutEdges<I, E> convertInputToComputeEdges(
+    OutEdges<I, E> inputEdges) {
+    if (!useInputOutEdges) {
+      return inputEdges;
+    } else {
+      return configuration.createAndInitializeOutEdges(inputEdges);
+    }
+  }
+
+  @Override
+  public void moveEdgesToVertices() {
+    final boolean createSourceVertex = configuration.getCreateSourceVertex();
+    if (transientEdges.isEmpty()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("moveEdgesToVertices: No edges to move");
+      }
+      return;
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+    }
+
+    final BlockingQueue<Integer> partitionIdQueue =
+        new ArrayBlockingQueue<>(transientEdges.size());
+    partitionIdQueue.addAll(transientEdges.keySet());
+    int numThreads = configuration.getNumInputSplitsThreads();
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            Integer partitionId;
+            I representativeVertexId = configuration.createVertexId();
+            while ((partitionId = partitionIdQueue.poll()) != null) {
+              Partition<I, V, E> partition =
+                  service.getPartitionStore().getOrCreatePartition(partitionId);
+              Map<K, OutEdges<I, E>> partitionEdges =
+                  transientEdges.remove(partitionId);
+              Iterator<Et> iterator =
+                  getPartitionEdgesIterator(partitionEdges);
+              // process all vertices in given partition
+              while (iterator.hasNext()) {
+                Et entry = iterator.next();
+                I vertexId = getVertexId(entry,
+                    representativeVertexId);
+                OutEdges<I, E> outEdges = convertInputToComputeEdges(
+                    removePartitionEdges(entry, partitionEdges));
+                Vertex<I, V, E> vertex = partition.getVertex(vertexId);
+                // If the source vertex doesn't exist, create it. Otherwise,
+                // just set the edges.
+                if (vertex == null) {
+                  if (createSourceVertex) {
+                    // createVertex only if it is allowed by configuration
+                    vertex = configuration.createVertex();
+                    vertex.initialize(createVertexId(entry),
+                        configuration.createVertexValue(), outEdges);
+                    partition.putVertex(vertex);
+                  }
+                } else {
+                  // A vertex may exist with or without edges initially
+                  // and optimize the case of no initial edges
+                  if (vertex.getNumEdges() == 0) {
+                    vertex.setEdges(outEdges);
+                  } else {
+                    for (Edge<I, E> edge : outEdges) {
+                      vertex.addEdge(edge);
+                    }
+                  }
+                  // Some Partition implementations (e.g. ByteArrayPartition)
+                  // require us to put back the vertex after modifying it.
+                  partition.saveVertex(vertex);
+                }
+              }
+              // Some PartitionStore implementations
+              // (e.g. DiskBackedPartitionStore) require us to put back the
+              // partition after modifying it.
+              service.getPartitionStore().putPartition(partition);
+            }
+            return null;
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "move-edges-%d", progressable);
+
+    // remove all entries
+    transientEdges.clear();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+          "vertices.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 57ad387..1150eaf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -18,25 +18,9 @@
 
 package org.apache.giraph.edge;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.Trimmable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.MapMaker;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * Collects incoming edges for vertices owned by this worker.
@@ -45,50 +29,8 @@ import java.util.concurrent.ConcurrentMap;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
-public class EdgeStore<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(EdgeStore.class);
-  /** Service worker. */
-  private CentralizedServiceWorker<I, V, E> service;
-  /** Giraph configuration. */
-  private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
-  /** Progressable to report progress. */
-  private Progressable progressable;
-  /** Map used to temporarily store incoming edges. */
-  private ConcurrentMap<Integer,
-      ConcurrentMap<I, OutEdges<I, E>>> transientEdges;
-  /**
-   * Whether the chosen {@link OutEdges} implementation allows for Edge
-   * reuse.
-   */
-  private boolean reuseEdgeObjects;
-  /**
-   * Whether the {@link OutEdges} class used during input is different
-   * from the one used during computation.
-   */
-  private boolean useInputOutEdges;
-
-  /**
-   * Constructor.
-   *
-   * @param service Service worker
-   * @param configuration Configuration
-   * @param progressable Progressable
-   */
-  public EdgeStore(
-      CentralizedServiceWorker<I, V, E> service,
-      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      Progressable progressable) {
-    this.service = service;
-    this.configuration = configuration;
-    this.progressable = progressable;
-    transientEdges = new MapMaker().concurrencyLevel(
-        configuration.getNettyServerExecutionConcurrency()).makeMap();
-    reuseEdgeObjects = configuration.reuseEdgeObjects();
-    useInputOutEdges = configuration.useInputOutEdges();
-  }
-
+public interface EdgeStore<I extends WritableComparable,
+   V extends Writable, E extends Writable> {
   /**
    * Add edges belonging to a given partition on this worker.
    * Note: This method is thread-safe.
@@ -96,150 +38,11 @@ public class EdgeStore<I extends WritableComparable,
    * @param partitionId Partition id for the incoming edges.
    * @param edges Incoming edges
    */
-  public void addPartitionEdges(
-      int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
-    ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
-        transientEdges.get(partitionId);
-    if (partitionEdges == null) {
-      ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
-          new MapMaker().concurrencyLevel(
-              configuration.getNettyServerExecutionConcurrency()).makeMap();
-      partitionEdges = transientEdges.putIfAbsent(partitionId,
-          newPartitionEdges);
-      if (partitionEdges == null) {
-        partitionEdges = newPartitionEdges;
-      }
-    }
-    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
-        edges.getVertexIdEdgeIterator();
-    while (vertexIdEdgeIterator.hasNext()) {
-      vertexIdEdgeIterator.next();
-      I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
-      Edge<I, E> edge = reuseEdgeObjects ?
-          vertexIdEdgeIterator.getCurrentEdge() :
-          vertexIdEdgeIterator.releaseCurrentEdge();
-      OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
-      if (outEdges == null) {
-        OutEdges<I, E> newOutEdges =
-            configuration.createAndInitializeInputOutEdges();
-        outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
-        if (outEdges == null) {
-          outEdges = newOutEdges;
-          // Since we had to use the vertex id as a new key in the map,
-          // we need to release the object.
-          vertexIdEdgeIterator.releaseCurrentVertexId();
-        }
-      }
-      synchronized (outEdges) {
-        outEdges.add(edge);
-      }
-    }
-  }
-
-  /**
-   * Convert the input edges to the {@link OutEdges} data structure used
-   * for computation (if different).
-   *
-   * @param inputEdges Input edges
-   * @return Compute edges
-   */
-  private OutEdges<I, E> convertInputToComputeEdges(
-      OutEdges<I, E> inputEdges) {
-    if (!useInputOutEdges) {
-      return inputEdges;
-    } else {
-      return configuration.createAndInitializeOutEdges(inputEdges);
-    }
-  }
+  void addPartitionEdges(int partitionId, ByteArrayVertexIdEdges<I, E> edges);
 
   /**
    * Move all edges from temporary storage to their source vertices.
    * Note: this method is not thread-safe.
    */
-  public void moveEdgesToVertices() {
-    final boolean createSourceVertex = configuration.
-        getCreateSourceVertex();
-    if (transientEdges.isEmpty()) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("moveEdgesToVertices: No edges to move");
-      }
-      return;
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
-    }
-
-    final BlockingQueue<Integer> partitionIdQueue =
-        new ArrayBlockingQueue<Integer>(transientEdges.size());
-    partitionIdQueue.addAll(transientEdges.keySet());
-    int numThreads = configuration.getNumInputSplitsThreads();
-
-    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
-      @Override
-      public Callable<Void> newCallable(int callableId) {
-        return new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            Integer partitionId;
-            while ((partitionId = partitionIdQueue.poll()) != null) {
-              Partition<I, V, E> partition =
-                  service.getPartitionStore().getOrCreatePartition(partitionId);
-              ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
-                  transientEdges.remove(partitionId);
-              for (I vertexId : partitionEdges.keySet()) {
-                OutEdges<I, E> outEdges = convertInputToComputeEdges(
-                    partitionEdges.remove(vertexId));
-                Vertex<I, V, E> vertex = partition.getVertex(vertexId);
-                // If the source vertex doesn't exist, create it. Otherwise,
-                // just set the edges.
-                if (vertex == null) {
-                  if (createSourceVertex) {
-                    // createVertex only if it is allowed by configuration
-                    vertex = configuration.createVertex();
-                    vertex.initialize(vertexId,
-                        configuration.createVertexValue(), outEdges);
-                    if (vertex instanceof Trimmable) {
-                      ((Trimmable) vertex).trim();
-                    }
-                    partition.putVertex(vertex);
-                  }
-                } else {
-                  // A vertex may exist with or without edges initially
-                  // and optimize the case of no initial edges
-                  if (vertex.getNumEdges() == 0) {
-                    vertex.setEdges(outEdges);
-                  } else {
-                    for (Edge<I, E> edge : outEdges) {
-                      vertex.addEdge(edge);
-                    }
-                  }
-                  if (vertex instanceof Trimmable) {
-                    ((Trimmable) vertex).trim();
-                  }
-                  // Some Partition implementations (e.g. ByteArrayPartition)
-                  // require us to put back the vertex after modifying it.
-                  partition.saveVertex(vertex);
-                }
-              }
-              // Some PartitionStore implementations
-              // (e.g. DiskBackedPartitionStore) require us to put back the
-              // partition after modifying it.
-              service.getPartitionStore().putPartition(partition);
-            }
-            return null;
-          }
-        };
-      }
-    };
-    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
-        "move-edges-%d", progressable);
-
-    transientEdges.clear();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
-          "vertices.");
-    }
-  }
+  void moveEdgesToVertices();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
new file mode 100644
index 0000000..cb47fd0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Factory to create a new Edge Store
+ * @param <I> vertex id
+ * @param <V> vertex value
+ * @param <E> edge value
+ */
+public interface EdgeStoreFactory<I extends WritableComparable,
+  V extends Writable, E extends Writable> {
+
+  /**
+   * Creates new edge store.
+   *
+   * @return edge store
+   */
+  EdgeStore<I, V, E> newStore();
+
+  /**
+   * Implementation class should use this method of initialization
+   * of any required internal state.
+   *
+   * @param service Service to get partition mappings
+   * @param conf Configuration
+   * @param progressable Progressable
+   */
+  void initialize(CentralizedServiceWorker<I, V, E> service,
+    ImmutableClassesGiraphConfiguration<I, V, E> conf,
+    Progressable progressable);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
new file mode 100644
index 0000000..d3d6997
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.primitives.IntEdgeStore;
+import org.apache.giraph.edge.primitives.LongEdgeStore;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Edge store factory which produces message stores which hold all
+ * edges in memory. It creates primitive edges stores when vertex id is
+ * IntWritable or LongWritable
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("unchecked")
+public class InMemoryEdgeStoreFactory<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  implements EdgeStoreFactory<I, V, E> {
+  /** Service worker. */
+  protected CentralizedServiceWorker<I, V, E> service;
+  /** Giraph configuration. */
+  protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Progressable to report progress. */
+  protected Progressable progressable;
+
+  @Override
+  public EdgeStore<I, V, E> newStore() {
+    Class<I> vertexIdClass = conf.getVertexIdClass();
+    EdgeStore<I, V, E> edgeStore;
+    if (vertexIdClass.equals(IntWritable.class)) {
+      edgeStore = (EdgeStore<I, V, E>) new IntEdgeStore<>(
+          (CentralizedServiceWorker<IntWritable, V, E>) service,
+          (ImmutableClassesGiraphConfiguration<IntWritable, V, E>) conf,
+          progressable);
+    } else if (vertexIdClass.equals(LongWritable.class)) {
+      edgeStore = (EdgeStore<I, V, E>) new LongEdgeStore<>(
+          (CentralizedServiceWorker<LongWritable, V, E>) service,
+          (ImmutableClassesGiraphConfiguration<LongWritable, V, E>) conf,
+          progressable);
+    } else {
+      edgeStore = new SimpleEdgeStore<>(service, conf, progressable);
+    }
+    return edgeStore;
+  }
+
+  @Override
+  public void initialize(CentralizedServiceWorker<I, V, E> service,
+    ImmutableClassesGiraphConfiguration<I, V, E> conf,
+    Progressable progressable) {
+    this.service = service;
+    this.conf = conf;
+    this.progressable = progressable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
new file mode 100644
index 0000000..6e2a74f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.MapMaker;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Simple in memory edge store which supports any type of ids.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class SimpleEdgeStore<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  extends AbstractEdgeStore<I, V, E, I,
+  Map.Entry<I, OutEdges<I, E>>> {
+
+  /**
+   * Constructor.
+   *
+   * @param service Service worker
+   * @param configuration Configuration
+   * @param progressable Progressable
+   */
+  public SimpleEdgeStore(
+    CentralizedServiceWorker<I, V, E> service,
+    ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+    Progressable progressable) {
+    super(service, configuration, progressable);
+  }
+
+  @Override
+  protected I getVertexId(Map.Entry<I, OutEdges<I, E>> entry,
+    I representativeVertexId) {
+    return entry.getKey();
+  }
+
+  @Override
+  protected I createVertexId(Map.Entry<I, OutEdges<I, E>> entry) {
+    return entry.getKey();
+  }
+
+  @Override
+  protected ConcurrentMap<I, OutEdges<I, E>> getPartitionEdges(
+    int partitionId) {
+    ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
+        (ConcurrentMap<I, OutEdges<I, E>>) transientEdges.get(partitionId);
+    if (partitionEdges == null) {
+      ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
+          new MapMaker().concurrencyLevel(
+              configuration.getNettyServerExecutionConcurrency()).makeMap();
+      partitionEdges = (ConcurrentMap<I, OutEdges<I, E>>)
+          transientEdges.putIfAbsent(partitionId, newPartitionEdges);
+      if (partitionEdges == null) {
+        partitionEdges = newPartitionEdges;
+      }
+    }
+    return partitionEdges;
+  }
+
+  @Override
+  protected OutEdges<I, E> removePartitionEdges(
+      Map.Entry<I, OutEdges<I, E>> entry,
+      Map<I, OutEdges<I, E>> partitionEdges) {
+    return partitionEdges.put(entry.getKey(), null);
+  }
+
+  @Override
+  protected Iterator<Map.Entry<I, OutEdges<I, E>>>
+  getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) {
+    return partitionEdges.entrySet().iterator();
+  }
+
+  @Override
+  protected OutEdges<I, E> getVertexOutEdges(
+      ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+      Map<I, OutEdges<I, E>> partitionEdgesIn) {
+    ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
+        (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;
+    I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+    OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
+    if (outEdges == null) {
+      OutEdges<I, E> newOutEdges =
+          configuration.createAndInitializeInputOutEdges();
+      outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
+      if (outEdges == null) {
+        outEdges = newOutEdges;
+        // Since we had to use the vertex id as a new key in the map,
+        // we need to release the object.
+        vertexIdEdgeIterator.releaseCurrentVertexId();
+      }
+    }
+    return outEdges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
new file mode 100644
index 0000000..c6b5051
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Special edge store to be used when ids are IntWritable.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class IntEdgeStore<V extends Writable, E extends Writable>
+  extends AbstractEdgeStore<IntWritable, V, E, Integer,
+  Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> {
+
+  /**
+   * Constructor.
+   *
+   * @param service       Service worker
+   * @param configuration Configuration
+   * @param progressable  Progressable
+   */
+  public IntEdgeStore(
+      CentralizedServiceWorker<IntWritable, V, E> service,
+      ImmutableClassesGiraphConfiguration<IntWritable, V, E> configuration,
+      Progressable progressable) {
+    super(service, configuration, progressable);
+  }
+
+  @Override
+  protected IntWritable getVertexId(
+    Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry,
+    IntWritable representativeVertexId) {
+    representativeVertexId.set(entry.getIntKey());
+    return representativeVertexId;
+  }
+
+  @Override
+  protected IntWritable createVertexId(
+    Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry) {
+    return new IntWritable(entry.getIntKey());
+  }
+
+  @Override
+  protected OutEdges<IntWritable, E> removePartitionEdges(
+    Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry,
+    Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {
+    return partitionEdges.put(entry.getIntKey(), null);
+  }
+
+  @Override
+  protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>>
+  getPartitionEdgesIterator(
+    Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {
+    return  ((Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdges)
+        .int2ObjectEntrySet()
+        .iterator();
+  }
+
+  @Override
+  protected Int2ObjectMap<OutEdges<IntWritable, E>> getPartitionEdges(
+      int partitionId) {
+    Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
+        (Int2ObjectMap<OutEdges<IntWritable, E>>)
+            transientEdges.get(partitionId);
+    if (partitionEdges == null) {
+      Int2ObjectMap<OutEdges<IntWritable, E>> newPartitionEdges =
+          Int2ObjectMaps.synchronize(
+              new Int2ObjectOpenHashMap<OutEdges<IntWritable, E>>());
+      partitionEdges = (Int2ObjectMap<OutEdges<IntWritable, E>>)
+          transientEdges.putIfAbsent(partitionId,
+              newPartitionEdges);
+      if (partitionEdges == null) {
+        partitionEdges = newPartitionEdges;
+      }
+    }
+    return partitionEdges;
+  }
+
+  @Override
+  protected OutEdges<IntWritable, E> getVertexOutEdges(
+      ByteArrayVertexIdEdges<IntWritable, E>.VertexIdEdgeIterator
+          vertexIdEdgeIterator,
+      Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) {
+    Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
+        (Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn;
+    IntWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+    OutEdges<IntWritable, E> outEdges = partitionEdges.get(vertexId.get());
+    if (outEdges == null) {
+      synchronized (partitionEdges) {
+        outEdges = partitionEdges.get(vertexId.get());
+        if (outEdges == null) {
+          outEdges = configuration.createAndInitializeInputOutEdges();
+          partitionEdges.put(vertexId.get(), outEdges);
+        }
+      }
+    }
+    return outEdges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
new file mode 100644
index 0000000..d4c44c7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Special edge store to be used when ids are LongWritable.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class LongEdgeStore<V extends Writable, E extends Writable>
+  extends AbstractEdgeStore<LongWritable, V, E, Long,
+  Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> {
+
+  /**
+   * Constructor.
+   *
+   * @param service Service worker
+   * @param configuration Configuration
+   * @param progressable Progressable
+   */
+  public LongEdgeStore(
+    CentralizedServiceWorker<LongWritable, V, E> service,
+    ImmutableClassesGiraphConfiguration<LongWritable, V, E> configuration,
+    Progressable progressable) {
+    super(service, configuration, progressable);
+  }
+
+  @Override
+  protected LongWritable getVertexId(
+    Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry,
+    LongWritable representativeVertexId) {
+    representativeVertexId.set(entry.getLongKey());
+    return representativeVertexId;
+  }
+
+  @Override
+  protected LongWritable createVertexId(
+    Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry) {
+    return new LongWritable(entry.getLongKey());
+  }
+
+
+  @Override
+  protected OutEdges<LongWritable, E> removePartitionEdges(
+      Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry,
+      Map<Long, OutEdges<LongWritable, E>> partitionEdges) {
+    return partitionEdges.put(entry.getLongKey(), null);
+  }
+
+  @Override
+  protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>>
+  getPartitionEdgesIterator(
+      Map<Long, OutEdges<LongWritable, E>> partitionEdges) {
+    return ((Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdges)
+        .long2ObjectEntrySet()
+        .iterator();
+  }
+
+  @Override
+  protected Long2ObjectMap<OutEdges<LongWritable, E>> getPartitionEdges(
+    int partitionId) {
+    Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
+      (Long2ObjectMap<OutEdges<LongWritable, E>>)
+        transientEdges.get(partitionId);
+    if (partitionEdges == null) {
+      Long2ObjectMap<OutEdges<LongWritable, E>> newPartitionEdges =
+          Long2ObjectMaps.synchronize(
+              new Long2ObjectOpenHashMap<OutEdges<LongWritable, E>>());
+      partitionEdges = (Long2ObjectMap<OutEdges<LongWritable, E>>)
+          transientEdges.putIfAbsent(partitionId,
+          newPartitionEdges);
+      if (partitionEdges == null) {
+        partitionEdges = newPartitionEdges;
+      }
+    }
+    return partitionEdges;
+  }
+
+  @Override
+  protected OutEdges<LongWritable, E> getVertexOutEdges(
+    ByteArrayVertexIdEdges<LongWritable, E>.VertexIdEdgeIterator
+      vertexIdEdgeIterator,
+    Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) {
+    Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
+        (Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn;
+    LongWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+    OutEdges<LongWritable, E> outEdges = partitionEdges.get(vertexId.get());
+    if (outEdges == null) {
+      synchronized (partitionEdges) {
+        outEdges = partitionEdges.get(vertexId.get());
+        if (outEdges == null) {
+          outEdges = configuration.createAndInitializeInputOutEdges();
+          partitionEdges.put(vertexId.get(), outEdges);
+        }
+      }
+    }
+    return outEdges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
new file mode 100644
index 0000000..81c5b6c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of edge stores specialized for certain type of vertex ids.
+ */
+package org.apache.giraph.edge.primitives;