You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/27 21:01:45 UTC

svn commit: r1414361 [1/2] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/messages/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/m...

Author: aching
Date: Tue Nov 27 20:01:38 2012
New Revision: 1414361

URL: http://svn.apache.org/viewvc?rev=1414361&view=rev
Log:
GIRAPH-435: Serialize server messages for memory and less GC. (aching)

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
Removed:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov 27 20:01:38 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-435: Serialize server messages for memory and less GC. (aching)
+
   GIRAPH-420: build formats in profiles where it works. (nitay)
 
   GIRAPH-421: Aggregate metrics up to master. (nitay)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Tue Nov 27 20:01:38 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.Partition;
 
 import org.apache.giraph.master.MasterObserver;
+
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -380,6 +381,17 @@ public class GiraphConfiguration extends
   /** Default number of messages that can be bulk sent during a flush */
   public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
 
+  /**
+   * Use message size encoding (typically better for complex objects,
+   * not meant for primitive wrapped messages)
+   */
+  public static final String USE_MESSAGE_SIZE_ENCODING =
+      "giraph.useMessageSizeEncoding";
+  /**
+   * By default, do not use message size encoding as it is experimental.
+   */
+  public static final boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+
   /** Number of channels used per server */
   public static final String CHANNELS_PER_SERVER =
       "giraph.channelsPerServer";
@@ -1102,4 +1114,15 @@ public class GiraphConfiguration extends
   public void useUnsafeSerialization(boolean useUnsafeSerialization) {
     setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
   }
+
+  /**
+   * Use message size encoding?  This feature may help with complex message
+   * objects.
+   *
+   * @return Whether to use message size encoding
+   */
+  public boolean useMessageSizeEncoding() {
+    return getBoolean(
+        USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Tue Nov 27 20:01:38 2012
@@ -18,6 +18,9 @@
 
 package org.apache.giraph;
 
+
+import java.util.List;
+
 import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.DefaultMasterCompute;
@@ -51,8 +54,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-import java.util.List;
-
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
  * Classes are immutable and final to provide the best performance for

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Tue Nov 27 20:01:38 2012
@@ -26,7 +26,7 @@ import org.apache.giraph.ImmutableClasse
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.WritableComp
 public class SendMessageCache<I extends WritableComparable,
     M extends Writable> {
   /** Internal cache */
-  private final ByteArrayVertexIdMessageCollection<I, M>[] messageCache;
+  private final ByteArrayVertexIdMessages<I, M>[] messageCache;
   /** Number of messages in each partition */
   private final int[] messageCounts;
   /** List of partition ids belonging to a worker */
@@ -73,7 +73,7 @@ public class SendMessageCache<I extends 
       workerPartitionIds.add(partitionOwner.getPartitionId());
       maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
     }
-    messageCache = new ByteArrayVertexIdMessageCollection[maxPartition + 1];
+    messageCache = new ByteArrayVertexIdMessages[maxPartition + 1];
 
     int maxWorker = 0;
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
@@ -95,10 +95,10 @@ public class SendMessageCache<I extends 
   public int addMessage(WorkerInfo workerInfo,
     final int partitionId, I destVertexId, M message) {
     // Get the message collection
-    ByteArrayVertexIdMessageCollection<I, M> partitionMessages =
+    ByteArrayVertexIdMessages<I, M> partitionMessages =
         messageCache[partitionId];
     if (partitionMessages == null) {
-      partitionMessages = new ByteArrayVertexIdMessageCollection<I, M>();
+      partitionMessages = new ByteArrayVertexIdMessages<I, M>();
       partitionMessages.setConf(conf);
       partitionMessages.initialize();
       messageCache[partitionId] = partitionMessages;
@@ -115,13 +115,13 @@ public class SendMessageCache<I extends 
    *
    * @param workerInfo the address of the worker who owns the data
    *                   partitions that are receiving the messages
-   * @return List of pairs (partitionId, ByteArrayVertexIdMessageCollection),
+   * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
    *         where all partition ids belong to workerInfo
    */
-  public PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+  public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
   removeWorkerMessages(WorkerInfo workerInfo) {
-    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> workerMessages =
-        new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
+    PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
+        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
     workerMessages.initialize();
     for (Integer partitionId : workerPartitions.get(workerInfo)) {
       if (messageCache[partitionId] != null) {
@@ -139,15 +139,15 @@ public class SendMessageCache<I extends 
    * @return All vertex messages for all partitions
    */
   public PairList<WorkerInfo, PairList<
-      Integer, ByteArrayVertexIdMessageCollection<I, M>>> removeAllMessages() {
+      Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
     PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessageCollection<I, M>>>
+        ByteArrayVertexIdMessages<I, M>>>
         allMessages = new PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>();
+        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
     allMessages.initialize();
     for (WorkerInfo workerInfo : workerPartitions.keySet()) {
-      PairList<Integer, ByteArrayVertexIdMessageCollection<I,
-          M>> workerMessages =
+      PairList<Integer, ByteArrayVertexIdMessages<I,
+                M>> workerMessages =
           removeWorkerMessages(workerInfo);
       if (!workerMessages.isEmpty()) {
         allMessages.add(workerInfo, workerMessages);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,13 +18,10 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
 /**
  * Most basic message store with just add, get and clear operations
  *
@@ -34,21 +31,24 @@ import java.util.Map;
 public interface BasicMessageStore<I extends WritableComparable,
     M extends Writable> extends Writable {
   /**
-   * Adds messages
+   * Adds messages from one message store to another
    *
-   * @param messages Map of messages we want to add
-   * @throws IOException
+   * @param messageStore Add the messages from this message store to this
+   *                     object
+   * @throws java.io.IOException
    */
-  void addMessages(Map<I, Collection<M>> messages) throws IOException;
+  void addMessages(MessageStore<I, M> messageStore) throws IOException;
 
   /**
-   * Gets messages for a vertex.
+   * Gets messages for a vertex.  The lifetime of every message is only
+   * guaranteed until the iterator's next() method is called.  Do not re-use
+   * the messages.
    *
    * @param vertexId Vertex id for which we want to get messages
-   * @return Messages for vertex with required id
+   * @return Iterable of messages for a vertex id
    * @throws IOException
    */
-  Collection<M> getVertexMessages(I vertexId) throws IOException;
+  Iterable<M> getVertexMessages(I vertexId) throws IOException;
 
   /**
    * Clears messages for a vertex.

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java?rev=1414361&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java Tue Nov 27 20:01:38 2012
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import com.google.common.collect.Iterators;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.giraph.utils.RepresentativeByteArrayIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where multiple messages are
+ * stored per vertex as byte arrays.  Used when there is no combiner provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
+    M extends Writable> extends SimpleMessageStore<I, M, ExtendedDataOutput> {
+  /**
+   * Constructor
+   *
+   * @param service Service worker
+   * @param config Hadoop configuration
+   */
+  public ByteArrayMessagesPerVertexStore(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    super(service, config);
+  }
+
+  /**
+   * Get the extended data output for a vertex id from the iterator, creating
+   * if necessary.  This method will take ownership of the vertex id from the
+   * iterator if necessary (if used in the partition map entry).
+   *
+   * @param partitionMap Partition map to look in
+   * @param iterator Special iterator that can release ownerhips of vertex ids
+   * @return Extended data output for this vertex id (created if necessary)
+   */
+  private ExtendedDataOutput getExtendedDataOutput(
+      ConcurrentMap<I, ExtendedDataOutput> partitionMap,
+      ByteArrayVertexIdMessages<I, M>.VertexIdIterator iterator) {
+    ExtendedDataOutput extendedDataOutput =
+        partitionMap.get(iterator.getCurrentVertexId());
+    if (extendedDataOutput == null) {
+      ExtendedDataOutput newExtendedDataOutput =
+          config.createExtendedDataOutput();
+      extendedDataOutput =
+          partitionMap.putIfAbsent(
+              iterator.releaseCurrentVertexId(),
+              newExtendedDataOutput);
+      if (extendedDataOutput == null) {
+        extendedDataOutput = newExtendedDataOutput;
+      }
+    }
+    return extendedDataOutput;
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId,
+      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+    ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+        getOrCreatePartitionMap(partitionId);
+    ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
+        vertexIdMessageBytesIterator =
+        messages.getVertexIdMessageBytesIterator();
+    // Try to copy the message buffer over rather than
+    // doing a deserialization of a message just to know its size.  This
+    // should be more efficient for complex objects where serialization is
+    // expensive.  If this type of iterator is not available, fall back to
+    // deserializing/serializing the messages
+    if (vertexIdMessageBytesIterator != null) {
+      while (vertexIdMessageBytesIterator.hasNext()) {
+        vertexIdMessageBytesIterator.next();
+        ExtendedDataOutput extendedDataOutput =
+            getExtendedDataOutput(partitionMap, vertexIdMessageBytesIterator);
+
+        synchronized (extendedDataOutput) {
+          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+              extendedDataOutput);
+        }
+      }
+    } else {
+      ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+          vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+      while (vertexIdMessageIterator.hasNext()) {
+        vertexIdMessageIterator.next();
+        ExtendedDataOutput extendedDataOutput =
+            getExtendedDataOutput(partitionMap, vertexIdMessageIterator);
+
+        synchronized (extendedDataOutput) {
+          vertexIdMessageIterator.getCurrentMessage().write(
+              extendedDataOutput);
+        }
+      }
+    }
+  }
+
+  /**
+   * Special iterable that recycles the message
+   */
+  private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
+    /**
+     * Constructor
+     *
+     * @param buf Buffer
+     * @param off Offset to start in the buffer
+     * @param length Length of the buffer
+     */
+    private MessagesIterable(byte[] buf, int off, int length) {
+      super(config, buf, off, length);
+    }
+
+    @Override
+    protected M createWritable() {
+      return config.createMessageValue();
+    }
+  }
+
+  @Override
+  protected Iterable<M> getMessagesAsIterable(
+      ExtendedDataOutput extendedDataOutput) {
+
+    return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  /**
+   * Special iterator only for counting messages
+   */
+  private class RepresentativeMessageIterator extends
+      RepresentativeByteArrayIterator<M> {
+    /**
+     * Constructor
+     *
+     * @param configuration Configuration
+     * @param buf buffer to read from
+     * @param off Offset into the buffer to start from
+     * @param length Length of the buffer
+     */
+    public RepresentativeMessageIterator(
+        ImmutableClassesGiraphConfiguration configuration,
+        byte[] buf, int off, int length) {
+      super(configuration, buf, off, length);
+    }
+
+    @Override
+    protected M createWritable() {
+      return config.createMessageValue();
+    }
+  }
+
+  @Override
+  protected int getNumberOfMessagesIn(
+      ConcurrentMap<I, ExtendedDataOutput> partitionMap) {
+    int numberOfMessages = 0;
+    for (ExtendedDataOutput extendedDataOutput : partitionMap.values()) {
+      numberOfMessages += Iterators.size(
+          new RepresentativeMessageIterator(config,
+              extendedDataOutput.getByteArray(), 0,
+              extendedDataOutput.getPos()));
+    }
+    return numberOfMessages;
+  }
+
+  @Override
+  protected void writeMessages(ExtendedDataOutput extendedDataOutput,
+      DataOutput out) throws IOException {
+    out.writeInt(extendedDataOutput.getPos());
+    out.write(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+  }
+
+  @Override
+  protected ExtendedDataOutput readFieldsForMessages(DataInput in) throws
+      IOException {
+    int byteArraySize = in.readInt();
+    byte[] messages = new byte[byteArraySize];
+    in.readFully(messages);
+    ExtendedDataOutput extendedDataOutput =
+        config.createExtendedDataOutput(messages, 0);
+    return extendedDataOutput;
+  }
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service Worker service
+   * @param config  Hadoop configuration
+   * @param <I>     Vertex id
+   * @param <M>     Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    return new Factory<I, M>(service, config);
+  }
+
+  @Override
+  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+    if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
+      ByteArrayMessagesPerVertexStore<I, M>
+          byteArrayMessagesPerVertexStore =
+          (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
+      for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
+           partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
+        for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
+            partitionEntry.getValue().entrySet()) {
+          ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+              getOrCreatePartitionMap(partitionEntry.getKey());
+          ExtendedDataOutput extendedDataOutput =
+              partitionMap.get(vertexEntry.getKey());
+          if (extendedDataOutput == null) {
+            ExtendedDataOutput newExtendedDataOutput =
+                config.createExtendedDataOutput();
+            extendedDataOutput =
+                partitionMap.putIfAbsent(vertexEntry.getKey(),
+                    newExtendedDataOutput);
+            if (extendedDataOutput == null) {
+              extendedDataOutput = newExtendedDataOutput;
+            }
+          }
+
+          // Add the messages
+          extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
+              vertexEntry.getValue().getPos());
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("addMessages: Illegal argument " +
+          messageStore.getClass());
+    }
+  }
+
+  /**
+   * Factory for {@link ByteArrayMessagesPerVertexStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, ?, ?, M> service;
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+
+    /**
+     * @param service Worker service
+     * @param config  Hadoop configuration
+     */
+    public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      this.service = service;
+      this.config = config;
+    }
+
+    @Override
+    public MessageStoreByPartition<I, M> newStore() {
+      return new ByteArrayMessagesPerVertexStore(service, config);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,21 +18,14 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -40,9 +33,14 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
- * Message storage with in memory map of messages and with support for
+ * Message storage with in-memory map of messages and with support for
  * flushing all the messages to the disk.
  *
  * @param <I> Vertex id
@@ -50,11 +48,15 @@ import java.util.concurrent.locks.Reentr
  */
 public class DiskBackedMessageStore<I extends WritableComparable,
     M extends Writable> implements FlushableMessageStore<I, M> {
-  /** In memory message map */
-  private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
+  /**
+   * In-memory message map (must be sorted to insure that the ids are
+   * ordered)
+   */
+  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+  inMemoryMessages;
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
-  /** Counter for number of messages in memory */
+  /** Counter for number of messages in-memory */
   private final AtomicInteger numberOfMessagesInMemory;
   /** To keep vertex ids which we have messages for */
   private final Set<I> destinationVertices;
@@ -67,13 +69,15 @@ public class DiskBackedMessageStore<I ex
   private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
   /**
+   * Constructor.
+   *
    * @param config           Hadoop configuration
    * @param fileStoreFactory Factory for creating file stores when flushing
    */
   public DiskBackedMessageStore(
       ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-    inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
     this.config = config;
     numberOfMessagesInMemory = new AtomicInteger(0);
     destinationVertices =
@@ -82,37 +86,90 @@ public class DiskBackedMessageStore<I ex
     this.fileStoreFactory = fileStoreFactory;
   }
 
-  @Override
-  public void addVertexMessages(I vertexId,
-      Collection<M> messages) throws IOException {
+  /**
+   * Add vertex messages
+   *
+   * @param vertexId Vertex id to use
+   * @param messages Messages to add (note that the lifetime of the messages)
+   *                 is only until next() is called again)
+   * @return True if the vertex id ownership is taken by this method,
+   *         false otherwise
+   * @throws IOException
+   */
+  boolean addVertexMessages(I vertexId,
+                            Iterable<M> messages) throws IOException {
+    boolean ownsVertexId = false;
     destinationVertices.add(vertexId);
-
     rwLock.readLock().lock();
     try {
-      CollectionUtils.addConcurrent(vertexId, messages, inMemoryMessages);
-      numberOfMessagesInMemory.addAndGet(messages.size());
+      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+      if (extendedDataOutput == null) {
+        ExtendedDataOutput newExtendedDataOutput =
+            config.createExtendedDataOutput();
+        extendedDataOutput =
+            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
+        if (extendedDataOutput == null) {
+          ownsVertexId = true;
+          extendedDataOutput = newExtendedDataOutput;
+        }
+      }
+
+      for (M message : messages) {
+        message.write(extendedDataOutput);
+        numberOfMessagesInMemory.getAndIncrement();
+      }
     } finally {
       rwLock.readLock().unlock();
     }
+
+    return ownsVertexId;
   }
 
   @Override
-  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
-    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
-      addVertexMessages(entry.getKey(), entry.getValue());
+  public void addMessages(MessageStore<I, M> messageStore) throws
+      IOException {
+    for (I destinationVertex : messageStore.getDestinationVertices()) {
+      addVertexMessages(destinationVertex,
+          messageStore.getVertexMessages(destinationVertex));
+    }
+  }
+
+  /**
+   * Special iterable that recycles the message
+   */
+  private class MessageIterable extends RepresentativeByteArrayIterable<M> {
+    /**
+     * Constructor
+     *
+     * @param buf Buffer
+     * @param off Offset to start in the buffer
+     * @param length Length of the buffer
+     */
+    public MessageIterable(
+        byte[] buf, int off, int length) {
+      super(config, buf, off, length);
+    }
+
+    @Override
+    protected M createWritable() {
+      return config.createMessageValue();
     }
   }
 
   @Override
-  public Collection<M> getVertexMessages(I vertexId) throws IOException {
-    Collection<M> messages = inMemoryMessages.get(vertexId);
-    if (messages == null) {
-      messages = Lists.newArrayList();
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+    if (extendedDataOutput == null) {
+      extendedDataOutput = config.createExtendedDataOutput();
     }
+    Iterable<M> combinedIterable = new MessageIterable(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+
     for (BasicMessageStore<I, M> fileStore : fileStores) {
-      messages.addAll(fileStore.getVertexMessages(vertexId));
+      combinedIterable = Iterables.concat(combinedIterable,
+          fileStore.getVertexMessages(vertexId));
     }
-    return messages;
+    return combinedIterable;
   }
 
   @Override
@@ -145,19 +202,94 @@ public class DiskBackedMessageStore<I ex
     fileStores.clear();
   }
 
+  /**
+   * Special temporary message store for passing along in-memory messages
+   */
+  private class TemporaryMessageStore implements MessageStore<I, M> {
+    /**
+     * In-memory message map (must be sorted to insure that the ids are
+     * ordered)
+     */
+    private final ConcurrentNavigableMap<I, ExtendedDataOutput>
+    temporaryMessages;
+
+    /**
+     * Constructor.
+     *
+     * @param temporaryMessages Messages to be owned by this object
+     */
+    private TemporaryMessageStore(
+        ConcurrentNavigableMap<I, ExtendedDataOutput>
+            temporaryMessages) {
+      this.temporaryMessages = temporaryMessages;
+    }
+
+    @Override
+    public int getNumberOfMessages() {
+      throw new IllegalAccessError("getNumberOfMessages: Not supported");
+    }
+
+    @Override
+    public boolean hasMessagesForVertex(I vertexId) {
+      return temporaryMessages.containsKey(vertexId);
+    }
+
+    @Override
+    public Iterable<I> getDestinationVertices() {
+      return temporaryMessages.keySet();
+    }
+
+    @Override
+    public void addMessages(MessageStore<I, M> messageStore)
+      throws IOException {
+      throw new IllegalAccessError("addMessages: Not supported");
+    }
+
+    @Override
+    public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+      ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
+      if (extendedDataOutput == null) {
+        extendedDataOutput = config.createExtendedDataOutput();
+      }
+      return new MessageIterable(extendedDataOutput.getByteArray(), 0,
+          extendedDataOutput.getPos());
+    }
+
+    @Override
+    public void clearVertexMessages(I vertexId) throws IOException {
+      temporaryMessages.remove(vertexId);
+    }
+
+    @Override
+    public void clearAll() throws IOException {
+      temporaryMessages.clear();
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      throw new IllegalAccessError("write: Not supported");
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      throw new IllegalAccessError("readFields: Not supported");
+    }
+  }
+
   @Override
   public void flush() throws IOException {
-    ConcurrentNavigableMap<I, Collection<M>> messagesToFlush = null;
+    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
     rwLock.writeLock().lock();
     try {
       messagesToFlush = inMemoryMessages;
-      inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
       numberOfMessagesInMemory.set(0);
     } finally {
       rwLock.writeLock().unlock();
     }
     BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
-    fileStore.addMessages(messagesToFlush);
+    fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
+
     synchronized (fileStores) {
       fileStores.add(fileStore);
     }
@@ -171,14 +303,15 @@ public class DiskBackedMessageStore<I ex
       vertexId.write(out);
     }
 
-    // write in memory messages map
+    // write of in-memory messages
+    out.writeInt(numberOfMessagesInMemory.get());
+
+    // write in-memory messages map
     out.writeInt(inMemoryMessages.size());
-    for (Entry<I, Collection<M>> entry : inMemoryMessages.entrySet()) {
+    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
       entry.getKey().write(out);
-      out.writeInt(entry.getValue().size());
-      for (M message : entry.getValue()) {
-        message.write(out);
-      }
+      out.writeInt(entry.getValue().getPos());
+      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
     }
 
     // write file stores
@@ -198,20 +331,19 @@ public class DiskBackedMessageStore<I ex
       destinationVertices.add(vertexId);
     }
 
-    // read in memory map
+    // read in-memory messages
+    numberOfMessagesInMemory.set(in.readInt());
+
+    // read in-memory map
     int mapSize = in.readInt();
     for (int m = 0; m < mapSize; m++) {
       I vertexId = config.createVertexId();
       vertexId.readFields(in);
-      int numMessages = in.readInt();
-      numberOfMessagesInMemory.addAndGet(numMessages);
-      List<M> messages = Lists.newArrayList();
-      for (int i = 0; i < numMessages; i++) {
-        M message = config.createMessageValue();
-        message.readFields(in);
-        messages.add(message);
-      }
-      inMemoryMessages.put(vertexId, messages);
+      int messageBytes = in.readInt();
+      byte[] buf = new byte[messageBytes];
+      ExtendedDataOutput extendedDataOutput =
+          config.createExtendedDataOutput(buf, messageBytes);
+      inMemoryMessages.put(vertexId, extendedDataOutput);
     }
 
     // read file stores

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Tue Nov 27 20:01:38 2012
@@ -24,14 +24,13 @@ import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -76,57 +75,60 @@ public class DiskBackedMessageStoreByPar
   }
 
   @Override
-  public void addVertexMessages(I vertexId,
-      Collection<M> messages) throws IOException {
-    getMessageStore(vertexId).addVertexMessages(vertexId, messages);
-    checkMemory();
-  }
-
-  @Override
-  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
-    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
-      getMessageStore(entry.getKey()).addVertexMessages(
-          entry.getKey(), entry.getValue());
+  public void addPartitionMessages(
+      int partitionId,
+      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+    FlushableMessageStore<I, M> flushableMessageStore =
+        getMessageStore(partitionId);
+    if (flushableMessageStore instanceof DiskBackedMessageStore) {
+      DiskBackedMessageStore<I, M> diskBackedMessageStore =
+          (DiskBackedMessageStore<I, M>) flushableMessageStore;
+      ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+          vertexIdMessageIterator =
+          messages.getVertexIdMessageIterator();
+      while (vertexIdMessageIterator.hasNext()) {
+        vertexIdMessageIterator.next();
+        boolean ownsVertexId =
+            diskBackedMessageStore.addVertexMessages(
+                vertexIdMessageIterator.getCurrentVertexId(),
+                Collections.singleton(
+                    vertexIdMessageIterator.getCurrentMessage()));
+        if (ownsVertexId) {
+          vertexIdMessageIterator.releaseCurrentVertexId();
+        }
+      }
+    } else {
+      throw new IllegalStateException("addPartitionMessages: Doesn't support " +
+          "class " + flushableMessageStore.getClass());
     }
     checkMemory();
   }
 
   @Override
-  public void addPartitionMessages(Map<I, Collection<M>> messages,
-      int partitionId) throws IOException {
-    getMessageStore(partitionId).addMessages(messages);
-    checkMemory();
-  }
-
-  @Override
-  public void addPartitionMessages(
-      ByteArrayVertexIdMessageCollection<I, M> messages,
-      int partitionId) throws IOException {
-    Map<I, Collection<M>> map = Maps.newHashMap();
-    ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
-        messages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      I vertexId = iterator.getCurrentVertexId();
-      M message = iterator.getCurrentMessage();
-      Collection<M> currentMessages = map.get(vertexId);
-      if (currentMessages == null) {
-        currentMessages = Lists.newArrayList(message);
-        map.put(vertexId, currentMessages);
+  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+    for (I destinationVertex : messageStore.getDestinationVertices()) {
+      FlushableMessageStore<I, M> flushableMessageStore =
+          getMessageStore(destinationVertex);
+      if (flushableMessageStore instanceof DiskBackedMessageStore) {
+        DiskBackedMessageStore<I, M> diskBackedMessageStore =
+            (DiskBackedMessageStore<I, M>) flushableMessageStore;
+        Iterable<M> messages =
+            messageStore.getVertexMessages(destinationVertex);
+        diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
       } else {
-        currentMessages.add(message);
+        throw new IllegalStateException("addMessages: Doesn't support " +
+            "class " + flushableMessageStore.getClass());
       }
     }
-    getMessageStore(partitionId).addMessages(map);
     checkMemory();
   }
 
   @Override
-  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
     if (hasMessagesForVertex(vertexId)) {
       return getMessageStore(vertexId).getVertexMessages(vertexId);
     } else {
-      return Collections.emptyList();
+      return EmptyIterable.<M>emptyIterable();
     }
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java Tue Nov 27 20:01:38 2012
@@ -21,9 +21,6 @@ package org.apache.giraph.comm.messages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-import java.util.Collection;
-
 /**
  * Message store
  *
@@ -33,16 +30,6 @@ import java.util.Collection;
 public interface MessageStore<I extends WritableComparable,
     M extends Writable> extends BasicMessageStore<I, M> {
   /**
-   * Adds messages
-   *
-   * @param vertexId Vertex id for which the messages are
-   * @param messages Messages for the vertex
-   * @throws IOException
-   */
-  void addVertexMessages(I vertexId,
-      Collection<M> messages) throws IOException;
-
-  /**
    * Get number of messages in memory
    *
    * @return Number of messages in memory

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Tue Nov 27 20:01:38 2012
@@ -21,9 +21,7 @@ package org.apache.giraph.comm.messages;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -38,22 +36,13 @@ public interface MessageStoreByPartition
   /**
    * Adds messages for partition
    *
-   * @param messages    Map of messages we want to add
    * @param partitionId Id of partition
-   * @throws IOException
-   */
-  void addPartitionMessages(Map<I, Collection<M>> messages,
-      int partitionId) throws IOException;
-
-  /**
-   * Adds messages for partition
-   *
    * @param messages    Collection of vertex ids and messages we want to add
-   * @param partitionId Id of partition
    * @throws IOException
    */
-  void addPartitionMessages(ByteArrayVertexIdMessageCollection<I, M> messages,
-      int partitionId) throws IOException;
+  void addPartitionMessages(
+      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+    throws IOException;
 
   /**
    * Gets vertex ids from selected partition which we have messages for

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java Tue Nov 27 20:01:38 2012
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.Combiner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Implementation of {@link SimpleMessageStore} where we have a single
@@ -78,29 +78,53 @@ public class OneMessagePerVertexStore<I 
     return currentMessage;
   }
 
-  @Override
-  protected void addVertexMessagesToPartition(I vertexId,
-      Collection<M> messages,
+  /**
+   * Add a single message for vertex to a partition map
+   *
+   * @param vertexId Id of vertex which received message
+   * @param message Message to add
+   * @param partitionMap Partition map to add the message to
+   * @throws IOException
+   */
+  private void addVertexMessageToPartition(I vertexId, M message,
       ConcurrentMap<I, M> partitionMap) throws IOException {
     M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
     synchronized (currentMessage) {
-      for (M message : messages) {
-        combiner.combine(vertexId, currentMessage, message);
-      }
+      combiner.combine(vertexId, currentMessage, message);
     }
   }
 
   @Override
-  protected void addVertexMessageToPartition(I vertexId, M message,
-      ConcurrentMap<I, M> partitionMap) throws IOException {
-    M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
-    synchronized (currentMessage) {
-      combiner.combine(vertexId, currentMessage, message);
+  public void addPartitionMessages(
+      int partitionId,
+      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+    ConcurrentMap<I, M> partitionMap =
+        getOrCreatePartitionMap(partitionId);
+    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+        vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+    // This loop is a little complicated as it is optimized to only create
+    // the minimal amount of vertex id and message objects as possible.
+    while (vertexIdMessageIterator.hasNext()) {
+      vertexIdMessageIterator.next();
+      I vertexId = vertexIdMessageIterator.getCurrentVertexId();
+      M currentMessage =
+          partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
+      if (currentMessage == null) {
+        M newMessage = combiner.createInitialMessage();
+        currentMessage = partitionMap.putIfAbsent(
+            vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
+        if (currentMessage == null) {
+          currentMessage = newMessage;
+        }
+      }
+      synchronized (currentMessage) {
+        combiner.combine(vertexId, currentMessage,
+            vertexIdMessageIterator.getCurrentMessage());
+      }
     }
   }
-
   @Override
-  protected Collection<M> getMessagesAsCollection(M message) {
+  protected Iterable<M> getMessagesAsIterable(M message) {
     return Collections.singleton(message);
   }
 
@@ -121,6 +145,27 @@ public class OneMessagePerVertexStore<I 
     return message;
   }
 
+  @Override
+  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+    if (messageStore instanceof OneMessagePerVertexStore) {
+      OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
+          (OneMessagePerVertexStore<I, M>) messageStore;
+      for (Map.Entry<Integer, ConcurrentMap<I, M>>
+          partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
+        ConcurrentMap<I, M> partitionMap =
+              getOrCreatePartitionMap(partitionEntry.getKey());
+        for (Map.Entry<I, M> vertexEntry :
+            partitionEntry.getValue().entrySet()) {
+          addVertexMessageToPartition(vertexEntry.getKey(),
+              vertexEntry.getValue(), partitionMap);
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("addMessages: Illegal argument " +
+          messageStore.getClass());
+    }
+  }
+
   /**
    * Create new factory for this message store
    *
@@ -138,7 +183,7 @@ public class OneMessagePerVertexStore<I 
   }
 
   /**
-   * Factory for {@link CollectionOfMessagesPerVertexStore}
+   * Factory for {@link OneMessagePerVertexStore}
    *
    * @param <I> Vertex id
    * @param <M> Message data

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,14 +18,9 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
+import com.google.common.collect.Sets;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInput;
@@ -36,18 +31,21 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.SortedMap;
+import java.util.List;
+import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
 
 /**
  * Used for writing and reading collection of messages to the disk. {@link
- * #addMessages(java.util.Map)} should be called only once with the messages
- * we want to store.
+ * #addMessages(MessageStore<I, M>)} should be called only once with
+ * the messages we want to store.
  * <p/>
  * It's optimized for retrieving messages in the natural order of vertex ids
  * they are sent to.
@@ -57,6 +55,9 @@ import java.util.concurrent.atomic.Atomi
  */
 public class SequentialFileMessageStore<I extends WritableComparable,
     M extends Writable> implements BasicMessageStore<I, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SequentialFileMessageStore.class);
   /** File in which we store data */
   private final File file;
   /** Configuration which we need for reading data */
@@ -88,39 +89,56 @@ public class SequentialFileMessageStore<
   }
 
   @Override
-  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
-    SortedMap<I, Collection<M>> map;
-    if (!(messages instanceof SortedMap)) {
-      map = Maps.newTreeMap();
-      map.putAll(messages);
-    } else {
-      map = (SortedMap) messages;
-    }
-    writeToFile(map);
-  }
-
-  /**
-   * Writes messages to its file.
-   *
-   * @param messages Messages to write
-   * @throws IOException
-   */
-  private void writeToFile(SortedMap<I, Collection<M>> messages) throws
-      IOException {
+  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+    // Writes messages to its file
     if (file.exists()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addMessages: Deleting " + file);
+      }
       file.delete();
     }
     file.createNewFile();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addMessages: Creating " + file);
+    }
+
     DataOutputStream out = null;
 
     try {
       out = new DataOutputStream(
           new BufferedOutputStream(new FileOutputStream(file), bufferSize));
-      out.writeInt(messages.size());
-      for (Entry<I, Collection<M>> entry : messages.entrySet()) {
-        entry.getKey().write(out);
-        out.writeInt(entry.getValue().size());
-        for (M message : entry.getValue()) {
+      int destinationVertexIdCount =
+          Iterables.size(messageStore.getDestinationVertices());
+      out.writeInt(destinationVertexIdCount);
+
+      // Since the message store messages might not be sorted, sort them if
+      // necessary
+      SortedSet<I> sortedSet;
+      if (messageStore.getDestinationVertices() instanceof SortedSet) {
+        sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
+      } else {
+        sortedSet =
+            Sets.newTreeSet(messageStore.getDestinationVertices());
+        for (I destinationVertexId : messageStore.getDestinationVertices()) {
+          sortedSet.add(destinationVertexId);
+        }
+      }
+
+      // Dump the vertices and their messages in a sorted order
+      for (I destinationVertexId : sortedSet) {
+        destinationVertexId.write(out);
+        Iterable<M> messages =
+            messageStore.getVertexMessages(destinationVertexId);
+        int messageCount = Iterables.size(messages);
+        out.writeInt(messageCount);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("addMessages: For vertex id " + destinationVertexId +
+              ", messages = " + messageCount + " to file " + file);
+        }
+        for (M message : messages) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("addMessages: Wrote " + message + " to " + file);
+          }
           message.write(out);
         }
       }
@@ -142,8 +160,12 @@ public class SequentialFileMessageStore<
    * @throws IOException
    */
   @Override
-  public Collection<M> getVertexMessages(I vertexId) throws
+  public Iterable<M> getVertexMessages(I vertexId) throws
       IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
+          " (currently " + currentVertexId + ") from " + file);
+    }
     if (in == null) {
       startReading();
     }
@@ -154,14 +176,14 @@ public class SequentialFileMessageStore<
     }
 
     if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
-      return Collections.emptyList();
+      return EmptyIterable.<M>emptyIterable();
     }
+
     return readMessagesForCurrentVertex();
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-  }
+  public void clearVertexMessages(I vertexId) throws IOException { }
 
   @Override
   public void clearAll() throws IOException {
@@ -213,6 +235,10 @@ public class SequentialFileMessageStore<
     in = new DataInputStream(
         new BufferedInputStream(new FileInputStream(file), bufferSize));
     verticesLeft = in.readInt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("startReading: File " + file + " with " +
+          verticesLeft + " vertices left");
+    }
   }
 
   /**
@@ -262,10 +288,17 @@ public class SequentialFileMessageStore<
    */
   private Collection<M> readMessagesForCurrentVertex() throws IOException {
     int messagesSize = in.readInt();
-    ArrayList<M> messages = Lists.newArrayList();
+    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
     for (int i = 0; i < messagesSize; i++) {
       M message = config.createMessageValue();
-      message.readFields(in);
+      try {
+        message.readFields(in);
+      } catch (IOException e) {
+        throw new IllegalStateException("readMessagesForCurrentVertex: " +
+            "Failed to read message from " + i + " of " +
+            messagesSize + " for vertex id " + currentVertexId + " from " +
+            file, e);
+      }
       messages.add(message);
     }
     currentVertexDone();
@@ -291,6 +324,9 @@ public class SequentialFileMessageStore<
    * @throws IOException
    */
   private void endReading() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("endReading: Stopped reading " + file);
+    }
     if (in != null) {
       in.close();
       in = null;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,24 +18,20 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Abstract class for {@link MessageStoreByPartition} which allows any kind
@@ -72,35 +68,12 @@ public abstract class SimpleMessageStore
   }
 
   /**
-   * Add collection of messages for vertex to a partition map
-   *
-   * @param vertexId Id of vertex which received messages
-   * @param messages Messages to add
-   * @param partitionMap Map which to add to
-   * @throws IOException
-   */
-  protected abstract void addVertexMessagesToPartition(I vertexId,
-      Collection<M> messages, ConcurrentMap<I, T> partitionMap) throws
-      IOException;
-
-  /**
-   * Add a single message for vertex to a partition map
-   *
-   * @param vertexId Id of vertex which received message
-   * @param message Message to add
-   * @param partitionMap Map which to add to
-   * @throws IOException
-   */
-  protected abstract void addVertexMessageToPartition(I vertexId,
-      M message, ConcurrentMap<I, T> partitionMap) throws IOException;
-
-  /**
-   * Get messages as collection from message storage
+   * Get messages as an iterable from message storage
    *
    * @param messages Message storage
-   * @return Messages as collection
+   * @return Messages as an iterable
    */
-  protected abstract Collection<M> getMessagesAsCollection(T messages);
+  protected abstract Iterable<M> getMessagesAsIterable(T messages);
 
   /**
    * Get number of messages in partition map
@@ -162,50 +135,6 @@ public abstract class SimpleMessageStore
   }
 
   @Override
-  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
-    for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
-      addVertexMessages(entry.getKey(), entry.getValue());
-    }
-  }
-
-  @Override
-  public void addVertexMessages(I vertexId,
-      Collection<M> messages) throws IOException {
-    int partitionId = getPartitionId(vertexId);
-    ConcurrentMap<I, T> partitionMap = getOrCreatePartitionMap(partitionId);
-    addVertexMessagesToPartition(vertexId, messages, partitionMap);
-  }
-
-  @Override
-  public void addPartitionMessages(Map<I, Collection<M>> messages,
-      int partitionId) throws IOException {
-    ConcurrentMap<I, T> partitionMap =
-        getOrCreatePartitionMap(partitionId);
-
-    for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
-      addVertexMessagesToPartition(entry.getKey(), entry.getValue(),
-          partitionMap);
-    }
-  }
-
-  @Override
-  public void addPartitionMessages(ByteArrayVertexIdMessageCollection<I,
-      M> messages,
-      int partitionId) throws IOException {
-    ConcurrentMap<I, T> partitionMap =
-        getOrCreatePartitionMap(partitionId);
-
-    ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
-        messages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      I vertexId = iterator.getCurrentVertexId();
-      M message = iterator.getCurrentMessage();
-      addVertexMessageToPartition(vertexId, message, partitionMap);
-    }
-  }
-
-  @Override
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
     ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
     return (partitionMap == null) ? Collections.<I>emptyList() :
@@ -229,14 +158,14 @@ public abstract class SimpleMessageStore
   }
 
   @Override
-  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
     if (partitionMap == null) {
       return Collections.<M>emptyList();
     }
     T messages = partitionMap.get(vertexId);
     return (messages == null) ? Collections.<M>emptyList() :
-        getMessagesAsCollection(messages);
+        getMessagesAsIterable(messages);
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Nov 27 20:01:38 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm.netty;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -32,15 +31,14 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
-import org.apache.giraph.comm.netty.handler.RequestServerHandler;
-import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
 import org.apache.giraph.comm.netty.handler.RequestInfo;
+import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
 /*if[HADOOP_NON_SECURE]
 else[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.netty.handler.SaslClientHandler;
@@ -65,8 +63,11 @@ import org.jboss.netty.handler.codec.fra
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.execution.ExecutionHandler;
 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
 import static org.jboss.netty.channel.Channels.pipeline;
 
+
+
 /**
  * Netty client for sending requests.  Thread-safe.
  */

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Tue Nov 27 20:01:38 2012
@@ -17,9 +17,7 @@
  */
 package org.apache.giraph.comm.netty;
 
-import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -45,7 +43,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.ValueGauge;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -146,7 +144,7 @@ public class NettyWorkerClientRequestPro
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageCount >= maxMessagesPerWorker) {
-      PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+      PairList<Integer, ByteArrayVertexIdMessages<I, M>>
           workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
@@ -174,29 +172,41 @@ public class NettyWorkerClientRequestPro
     // Messages are stored separately
     MessageStoreByPartition<I, M> messageStore =
         serverData.getCurrentMessageStore();
-    Map<I, Collection<M>> map = Maps.newHashMap();
+    ByteArrayVertexIdMessages<I, M> vertexIdMessages =
+        new ByteArrayVertexIdMessages<I, M>();
+    vertexIdMessages.setConf(configuration);
+    vertexIdMessages.initialize();
     int messagesInMap = 0;
     for (I vertexId :
         messageStore.getPartitionDestinationVertices(partitionId)) {
       try {
-        Collection<M> messages = messageStore.getVertexMessages(vertexId);
-        map.put(vertexId, messages);
-        messagesInMap += messages.size();
+        // Messages cannot be re-used from this iterable, but add()
+        // serializes the message, making this safe
+        Iterable<M> messages = messageStore.getVertexMessages(vertexId);
+        for (M message : messages) {
+          vertexIdMessages.add(vertexId, message);
+          ++messagesInMap;
+        }
       } catch (IOException e) {
         throw new IllegalStateException(
             "sendVertexRequest: Got IOException ", e);
       }
       if (messagesInMap > maxMessagesPerWorker) {
         WritableRequest messagesRequest = new
-            SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+            SendPartitionCurrentMessagesRequest<I, V, E, M>(
+            partitionId, vertexIdMessages);
         doRequest(workerInfo, messagesRequest);
-        map.clear();
+        vertexIdMessages =
+            new ByteArrayVertexIdMessages<I, M>();
+        vertexIdMessages.setConf(configuration);
+        vertexIdMessages.initialize();
         messagesInMap = 0;
       }
     }
-    if (!map.isEmpty()) {
+    if (vertexIdMessages != null) {
       WritableRequest messagesRequest = new
-          SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+          SendPartitionCurrentMessagesRequest<I, V, E, M>(
+          partitionId, vertexIdMessages);
       doRequest(workerInfo, messagesRequest);
     }
   }
@@ -322,10 +332,10 @@ public class NettyWorkerClientRequestPro
 
     // Execute the remaining sends messages (if any)
     PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessageCollection<I, M>>>
+        ByteArrayVertexIdMessages<I, M>>>
         remainingMessageCache = sendMessageCache.removeAllMessages();
     PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>.Iterator
+        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
         iterator = remainingMessageCache.getIterator();
     while (iterator.hasNext()) {
       iterator.next();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Nov 27 20:01:38 2012
@@ -22,7 +22,7 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.comm.WorkerServer;
@@ -116,10 +116,10 @@ public class NettyWorkerServer<I extends
       } else {
         if (LOG.isInfoEnabled()) {
           LOG.info("createMessageStoreFactory: " +
-              "Using CollectionOfMessagesPerVertexStore " +
+              "Using ByteArrayMessagesPerVertexStore " +
               "since there is no combiner");
         }
-        return CollectionOfMessagesPerVertexStore.newFactory(service, conf);
+        return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
       }
     } else {
       int maxMessagesInMemory = conf.getInt(

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java Tue Nov 27 20:01:38 2012
@@ -18,19 +18,13 @@
 
 package org.apache.giraph.comm.requests;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Send a collection of vertex messages for a partition. It adds messages to
@@ -44,10 +38,10 @@ import java.util.Map.Entry;
 public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
   V extends Writable, E extends Writable, M extends Writable> extends
   WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** the destination partition for these vertices' messages*/
+  /** Destination partition for these vertices' messages*/
   private int partitionId;
-  /** map of destination vertex ID's to message lists */
-  private Map<I, Collection<M>> vertexMessageMap;
+  /** Map of destination vertex ID's to message lists */
+  private ByteArrayVertexIdMessages<I, M> vertexIdMessageMap;
 
   /** Constructor used for reflection only */
   public SendPartitionCurrentMessagesRequest() { }
@@ -59,10 +53,10 @@ public class SendPartitionCurrentMessage
    * @param vertexIdMessages Map of messages to send
    */
   public SendPartitionCurrentMessagesRequest(int partitionId,
-    Map<I, Collection<M>> vertexIdMessages) {
+    ByteArrayVertexIdMessages<I, M> vertexIdMessages) {
     super();
     this.partitionId = partitionId;
-    this.vertexMessageMap = vertexIdMessages;
+    this.vertexIdMessageMap = vertexIdMessages;
   }
 
   @Override
@@ -73,42 +67,23 @@ public class SendPartitionCurrentMessage
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
-    final int numVertices = input.readInt();
-    vertexMessageMap =
-      Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
-    for (int i = 0; i < numVertices; ++i) {
-      I nextVertex = getConf().createVertexId();
-      nextVertex.readFields(input);
-      final int numMessages = input.readInt();
-      Collection<M> messagesForVertex =
-        Lists.<M>newArrayListWithExpectedSize(numMessages);
-      vertexMessageMap.put(nextVertex, messagesForVertex);
-      for (int j = 0; j < numMessages; ++j) {
-        M nextMessage = getConf().createMessageValue();
-        nextMessage.readFields(input);
-        messagesForVertex.add(nextMessage);
-      }
-    }
+    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>();
+    vertexIdMessageMap.setConf(getConf());
+    vertexIdMessageMap.initialize();
+    vertexIdMessageMap.readFields(input);
   }
 
   @Override
   public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionId);
-    output.writeInt(vertexMessageMap.size());
-    for (Entry<I, Collection<M>> entry : vertexMessageMap.entrySet()) {
-      entry.getKey().write(output);
-      output.writeInt(entry.getValue().size());
-      for (M message : entry.getValue()) {
-        message.write(output);
-      }
-    }
+    vertexIdMessageMap.write(output);
   }
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
     try {
-      serverData.getCurrentMessageStore().addPartitionMessages(
-        vertexMessageMap, partitionId);
+      serverData.getCurrentMessageStore().addPartitionMessages(partitionId,
+          vertexIdMessageMap);
     } catch (IOException e) {
       throw new RuntimeException("doRequest: Got IOException ", e);
     }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Tue Nov 27 20:01:38 2012
@@ -22,7 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -48,7 +48,7 @@ public class SendWorkerMessagesRequest<I
    * are owned by a single (destination) worker. These messages are all
    * destined for this worker.
    * */
-  private PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+  private PairList<Integer, ByteArrayVertexIdMessages<I, M>>
   partitionVertexMessages;
 
   /**
@@ -60,10 +60,10 @@ public class SendWorkerMessagesRequest<I
    * Constructor used to send request.
    *
    * @param partVertMsgs Map of remote partitions =>
-   *                     ByteArrayVertexIdMessageCollection
+   *                     ByteArrayVertexIdMessages
    */
   public SendWorkerMessagesRequest(
-    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> partVertMsgs) {
+    PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
     super();
     this.partitionVertexMessages = partVertMsgs;
   }
@@ -72,12 +72,12 @@ public class SendWorkerMessagesRequest<I
   public void readFieldsRequest(DataInput input) throws IOException {
     int numPartitions = input.readInt();
     partitionVertexMessages =
-        new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
+        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
     partitionVertexMessages.initialize(numPartitions);
     while (numPartitions-- > 0) {
       final int partitionId = input.readInt();
-      ByteArrayVertexIdMessageCollection<I, M> vertexIdMessages =
-          new ByteArrayVertexIdMessageCollection<I, M>();
+      ByteArrayVertexIdMessages<I, M> vertexIdMessages =
+          new ByteArrayVertexIdMessages<I, M>();
       vertexIdMessages.setConf(getConf());
       vertexIdMessages.readFields(input);
       partitionVertexMessages.add(partitionId, vertexIdMessages);
@@ -87,7 +87,7 @@ public class SendWorkerMessagesRequest<I
   @Override
   public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionVertexMessages.getSize());
-    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
         iterator = partitionVertexMessages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
@@ -103,14 +103,14 @@ public class SendWorkerMessagesRequest<I
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
         iterator = partitionVertexMessages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
       try {
-        serverData.getIncomingMessageStore()
-            .addPartitionMessages(iterator.getCurrentSecond(),
-                iterator.getCurrentFirst());
+        serverData.getIncomingMessageStore().
+            addPartitionMessages(iterator.getCurrentFirst(),
+                iterator.getCurrentSecond());
       } catch (IOException e) {
         throw new RuntimeException("doRequest: Got IOException ", e);
       }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Tue Nov 27 20:01:38 2012
@@ -46,7 +46,7 @@ public class SimpleMsgVertex extends
       for (IntWritable message : messages) {
         sum += message.get();
       }
-      LOG.info("TestMsgVertex: Received a sum of " + sum +
+      LOG.info("compute: Received a sum of " + sum +
           " (will stop on 306)");
 
       if (sum == 306) {
@@ -54,7 +54,7 @@ public class SimpleMsgVertex extends
       }
     }
     if (getSuperstep() > 3) {
-      System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
+      System.err.println("compute: Vertex 1 failed to receive " +
           "messages in time");
       voteToHalt();
     }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Tue Nov 27 20:01:38 2012
@@ -17,9 +17,15 @@
  */
 package org.apache.giraph.graph;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.core.TimerContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -28,22 +34,17 @@ import org.apache.giraph.comm.netty.Nett
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.metrics.GiraphMetrics;
+
+import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
-import org.apache.giraph.utils.Times;
-import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
+import org.apache.giraph.utils.Times;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
 /**
  * Compute as many vertex partitions as possible.  Every thread will has its
  * own instance of WorkerClientRequestProcessor to send requests.  Note that
@@ -189,10 +190,9 @@ public class ComputeCallable<I extends W
         // Make sure every vertex has this thread's
         // graphState before computing
         vertex.setGraphState(graphState);
-        Collection<M> messages =
+        Iterable<M> messages =
             messageStore.getVertexMessages(vertex.getId());
-        messageStore.clearVertexMessages(vertex.getId());
-        if (vertex.isHalted() && !messages.isEmpty()) {
+        if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
           vertex.wakeUp();
         }
         if (!vertex.isHalted()) {
@@ -209,6 +209,10 @@ public class ComputeCallable<I extends W
         if (vertex.isHalted()) {
           partitionStats.incrFinishedVertexCount();
         }
+        // Remove the messages now that the vertex has finished computation
+        messageStore.clearVertexMessages(vertex.getId());
+
+        // Add statistics for this vertex
         partitionStats.incrVertexCount();
         partitionStats.addEdgeCount(vertex.getNumEdges());
       }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Tue Nov 27 20:01:38 2012
@@ -98,7 +98,8 @@ public abstract class Vertex<I extends W
    * Must be defined by user to do computation on a single Vertex.
    *
    * @param messages Messages that were sent to this vertex in the previous
-   *                 superstep
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
    * @throws IOException
    */
   public abstract void compute(Iterable<M> messages) throws IOException;