You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/12/31 04:07:13 UTC

git commit: updated refs/heads/trunk to 78931c0

Repository: giraph
Updated Branches:
  refs/heads/trunk 5def2380c -> 78931c03f


GIRAPH-977: useMessageSizeEncoding is broken

Summary: When useMessageSizeEncoding is set to true, any application using messaging and no combiner will fail. It looks like writing lengths of messages was lost at some point, so we are just trying to read the lengths without ever writing them.

Test Plan: Added a test, it failed without the change but succeeded with it.

Reviewers: sergey.edunov, pavanka

Differential Revision: https://reviews.facebook.net/D30825


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/78931c03
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/78931c03
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/78931c03

Branch: refs/heads/trunk
Commit: 78931c03f36e2371bb1870d485be73e6bc2146a6
Parents: 5def238
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Dec 30 18:51:55 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Dec 30 19:06:39 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../giraph/utils/ByteArrayVertexIdMessages.java | 41 ++++++++++++++++++++
 .../TestIntFloatPrimitiveMessageStores.java     | 27 ++++++++-----
 3 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/78931c03/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 60f64d1..fbefa2f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-977: useMessageSizeEncoding is broken (majakabiljo)
+
   GIRAPH-976: More command line logging (majakabiljo)
   
   GIRAPH-972: Race condition in checkpointing (edunov)

http://git-wip-us.apache.org/repos/asf/giraph/blob/78931c03/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index b3eca3e..daad860 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -97,6 +97,47 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
   }
 
   @Override
+  public void add(I vertexId, M message) {
+    if (!useMessageSizeEncoding) {
+      super.add(vertexId, message);
+    } else {
+      try {
+        vertexId.write(extendedDataOutput);
+        writeMessageWithSize(message);
+      } catch (IOException e) {
+        throw new IllegalStateException("add: IOException occurred");
+      }
+    }
+  }
+
+  @Override
+  public void add(byte[] serializedId, int idPos, M message) {
+    if (!useMessageSizeEncoding) {
+      super.add(serializedId, idPos, message);
+    } else {
+      try {
+        extendedDataOutput.write(serializedId, 0, idPos);
+        writeMessageWithSize(message);
+      } catch (IOException e) {
+        throw new IllegalStateException("add: IOException occurred");
+      }
+    }
+  }
+
+  /**
+   * Write a size of the message and message
+   *
+   * @param message Message to write
+   */
+  private void writeMessageWithSize(M message) throws IOException {
+    int pos = extendedDataOutput.getPos();
+    extendedDataOutput.skipBytes(4);
+    writeData(extendedDataOutput, message);
+    extendedDataOutput.writeInt(
+        pos, extendedDataOutput.getPos() - pos - 4);
+  }
+
+  @Override
   public ByteStructVertexIdMessageBytesIterator<I, M>
   getVertexIdMessageBytesIterator() {
     if (!useMessageSizeEncoding) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/78931c03/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index 62a39db..b8137c0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -28,6 +28,7 @@ import org.apache.giraph.combiner.FloatSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.TestMessageValueFactory;
 import org.apache.giraph.graph.BasicComputation;
@@ -52,6 +53,8 @@ public class TestIntFloatPrimitiveMessageStores {
   private static final int NUM_PARTITIONS = 2;
   private static CentralizedServiceWorker<IntWritable, Writable, Writable>
     service;
+  private static ImmutableClassesGiraphConfiguration<IntWritable, Writable,
+      Writable> conf;
 
   @Before
   public void prepare() throws IOException {
@@ -74,6 +77,10 @@ public class TestIntFloatPrimitiveMessageStores {
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
     Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
     Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
+
+    GiraphConfiguration initConf = new GiraphConfiguration();
+    initConf.setComputationClass(IntFloatNoOpComputation.class);
+    conf = new ImmutableClassesGiraphConfiguration(initConf);
   }
 
   private static class IntFloatNoOpComputation extends
@@ -85,20 +92,12 @@ public class TestIntFloatPrimitiveMessageStores {
     }
   }
 
-  private static ImmutableClassesGiraphConfiguration<IntWritable, Writable,
-    Writable> createIntFloatConf() {
-
-    GiraphConfiguration initConf = new GiraphConfiguration();
-    initConf.setComputationClass(IntFloatNoOpComputation.class);
-    return new ImmutableClassesGiraphConfiguration(initConf);
-  }
-
   private static ByteArrayVertexIdMessages<IntWritable, FloatWritable>
   createIntFloatMessages() {
     ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages =
         new ByteArrayVertexIdMessages<IntWritable, FloatWritable>(
             new TestMessageValueFactory<FloatWritable>(FloatWritable.class));
-    messages.setConf(createIntFloatConf());
+    messages.setConf(conf);
     messages.initialize();
     return messages;
   }
@@ -149,7 +148,7 @@ public class TestIntFloatPrimitiveMessageStores {
     IntByteArrayMessageStore<FloatWritable> messageStore =
         new IntByteArrayMessageStore<FloatWritable>(new
             TestMessageValueFactory<FloatWritable>(FloatWritable.class),
-            service, createIntFloatConf());
+            service, conf);
     insertIntFloatMessages(messageStore);
 
     Iterable<FloatWritable> m0 =
@@ -173,4 +172,12 @@ public class TestIntFloatPrimitiveMessageStores {
     Assert.assertTrue(
         Iterables.isEmpty(messageStore.getVertexMessages(new IntWritable(3))));
   }
+
+  @Test
+  public void testIntByteArrayMessageStoreWithMessageEncoding() throws
+      IOException {
+    GiraphConstants.USE_MESSAGE_SIZE_ENCODING.set(conf, true);
+    testIntByteArrayMessageStore();
+    GiraphConstants.USE_MESSAGE_SIZE_ENCODING.set(conf, false);
+  }
 }