You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/03/16 00:48:59 UTC

git commit: updated refs/heads/trunk to 4170eeb

Repository: giraph
Updated Branches:
  refs/heads/trunk fafecee71 -> 4170eeb05


unsafe byte readers/writers

Summary: using unsafe readers/writers

Test Plan:
tested on PageRank app, and Fanout computation. In both cases, there is a ~20% speedup

JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049

Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo

Reviewed By: ikabiljo

Differential Revision: https://reviews.facebook.net/D55509


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4170eeb0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4170eeb0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4170eeb0

Branch: refs/heads/trunk
Commit: 4170eeb054ba1414eceb08ec6c0fbdbdb17eb5c2
Parents: fafecee
Author: spupyrev <sp...@fb.com>
Authored: Tue Mar 15 16:47:15 2016 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Tue Mar 15 16:48:42 2016 -0700

----------------------------------------------------------------------
 .../api/local/InternalMessageStore.java         | 46 ++++++++++++--------
 1 file changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4170eeb0/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
index 6c0cccb..92d9821 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
@@ -34,8 +34,8 @@ import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.types.ops.TypeOps;
 import org.apache.giraph.types.ops.TypeOpsUtils;
-import org.apache.giraph.utils.ExtendedByteArrayDataInput;
-import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -69,7 +69,7 @@ interface InternalMessageStore
   abstract class InternalConcurrentMessageStore
       <I extends WritableComparable, M extends Writable, R>
       implements InternalMessageStore<I, M> {
-    protected final ConcurrentHashMap<I, R> received =
+    private final ConcurrentHashMap<I, R> received =
         new ConcurrentHashMap<>();
 
     private final Class<I> idClass;
@@ -102,6 +102,10 @@ interface InternalMessageStore
       return value;
     }
 
+    R removeFor(I id) {
+      return received.remove(id);
+    }
+
     abstract R createNewReceiver();
 
     @Override
@@ -118,8 +122,9 @@ interface InternalMessageStore
 
     public static <I extends WritableComparable, M extends Writable>
     InternalMessageStore<I, M> createMessageStore(
-        final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
-        final MessageClasses<I, M> messageClasses) {
+      final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+      final MessageClasses<I, M> messageClasses
+    ) {
       MessageCombiner<? super I, M> combiner =
           messageClasses.createMessageCombiner(conf);
       if (combiner != null) {
@@ -132,8 +137,9 @@ interface InternalMessageStore
             messageClasses.createMessageValueFactory(conf));
       } else {
         return new InternalByteMessageStore<>(
-            conf.getVertexIdClass(),
-            messageClasses.createMessageValueFactory(conf));
+          conf.getVertexIdClass(),
+          messageClasses.createMessageValueFactory(conf),
+          conf);
       }
     }
 
@@ -175,7 +181,7 @@ interface InternalMessageStore
 
     @Override
     public Iterable<M> takeMessages(I id) {
-      M message = received.remove(id);
+      M message = removeFor(id);
       if (message != null) {
         return Collections.singleton(message);
       } else {
@@ -206,18 +212,22 @@ interface InternalMessageStore
   static class InternalByteMessageStore
       <I extends WritableComparable, M extends Writable>
       extends InternalConcurrentMessageStore<I, M,
-          ExtendedByteArrayDataOutput> {
+          ExtendedDataOutput> {
     private final MessageValueFactory<M> messageFactory;
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 
     public InternalByteMessageStore(
-        Class<I> idClass, MessageValueFactory<M> messageFactory) {
+      Class<I> idClass, MessageValueFactory<M> messageFactory,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf
+    ) {
       super(idClass);
       this.messageFactory = messageFactory;
+      this.conf = conf;
     }
 
     @Override
     public Iterable<M> takeMessages(I id) {
-      final ExtendedByteArrayDataOutput out = received.remove(id);
+      final ExtendedDataOutput out = removeFor(id);
       if (out == null) {
         return null;
       }
@@ -225,8 +235,10 @@ interface InternalMessageStore
       return new Iterable<M>() {
         @Override
         public Iterator<M> iterator() {
-          final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput(
-              out.getByteArray(), 0, out.getPos());
+          final ExtendedDataInput in = conf.createExtendedDataInput(
+            out.getByteArray(), 0, out.getPos()
+          );
+
           final M message = messageFactory.newInstance();
           return new AbstractIterator<M>() {
             @Override
@@ -248,7 +260,7 @@ interface InternalMessageStore
 
     @Override
     public void sendMessage(I id, M message) {
-      ExtendedByteArrayDataOutput out = getReceiverFor(id);
+      ExtendedDataOutput out = getReceiverFor(id);
 
       synchronized (out) {
         try {
@@ -260,8 +272,8 @@ interface InternalMessageStore
     }
 
     @Override
-    ExtendedByteArrayDataOutput createNewReceiver() {
-      return new ExtendedByteArrayDataOutput();
+    ExtendedDataOutput createNewReceiver() {
+      return conf.createExtendedDataOutput();
     }
   }
 
@@ -285,7 +297,7 @@ interface InternalMessageStore
 
     @Override
     public Iterable<M> takeMessages(I id) {
-      final List<byte[]> out = received.remove(id);
+      final List<byte[]> out = removeFor(id);
       if (out == null) {
         return null;
       }