You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/29 19:03:52 UTC

svn commit: r1403451 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/messages/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/org/apache/giraph/comm/requests/ gira...

Author: aching
Date: Mon Oct 29 18:03:51 2012
New Revision: 1403451

URL: http://svn.apache.org/viewvc?rev=1403451&view=rev
Log:
GIRAPH-388: Improve the way we keep outgoing messages (majakabiljo
via aching).

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 29 18:03:51 2012
@@ -1,7 +1,11 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
-  GIRAPH-389: Multithreading should intelligently allocate the thread pools. (aching via ereisman)
+  GIRAPH-388: Improve the way we keep outgoing messages (majakabiljo
+  via aching).
+
+  GIRAPH-389: Multithreading should intelligently allocate the thread
+  pools. (aching via ereisman)
 
   GIRAPH-273: Aggregators shouldn't use Zookeeper (majakabiljo)
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Mon Oct 29 18:03:51 2012
@@ -18,9 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,8 +27,6 @@ import org.apache.giraph.graph.WorkerInf
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-
 /**
  * Aggregates the messages to be send to workers so they can be sent
  * in bulk.  Not thread-safe.
@@ -45,11 +40,14 @@ public class SendMessageCache<I extends 
   /** Combiner instance, can be null */
   private final VertexCombiner<I, M> combiner;
   /** Internal cache */
-  private Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> messageCache =
-      new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
+  private Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+  messageCache =
+      new HashMap<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>();
   /** Number of messages in each partition */
   private final Map<WorkerInfo, Integer> messageCountMap =
       new HashMap<WorkerInfo, Integer>();
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
@@ -57,6 +55,7 @@ public class SendMessageCache<I extends 
    * @param conf Configuration used for instantiating the combiner.
    */
   public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
     if (conf.getVertexCombinerClass() == null) {
       this.combiner = null;
     } else {
@@ -77,36 +76,21 @@ public class SendMessageCache<I extends 
   public int addMessage(WorkerInfo workerInfo,
     final int partitionId, I destVertexId, M message) {
     // Get the message collection
-    Map<Integer, Map<I, Collection<M>>> partitionMap =
+    Map<Integer, VertexIdMessageCollection<I, M>> partitionMap =
       messageCache.get(workerInfo);
     if (partitionMap == null) {
-      partitionMap = new HashMap<Integer, Map<I, Collection<M>>>();
+      partitionMap = new HashMap<Integer, VertexIdMessageCollection<I, M>>();
       messageCache.put(workerInfo, partitionMap);
     }
-    Map<I, Collection<M>> idMessagesMap = partitionMap.get(partitionId);
-
-    if (idMessagesMap == null) {
-      idMessagesMap = new HashMap<I, Collection<M>>();
-      partitionMap.put(partitionId, idMessagesMap);
-    }
-    Collection<M> messages = idMessagesMap.get(destVertexId);
-    if (messages == null) {
-      messages = new ArrayList<M>(1);
-      idMessagesMap.put(destVertexId, messages);
-    }
+    VertexIdMessageCollection<I, M> vertexMessages =
+        partitionMap.get(partitionId);
 
-    // Add the message
-    final int originalMessageCount = messages.size();
-    messages.add(message);
-    if (combiner != null && originalMessageCount > 0) {
-      try {
-        messages = Lists.newArrayList(combiner.combine(destVertexId, messages));
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "addMessage: Combiner failed to combine messages " + messages, e);
-      }
-      idMessagesMap.put(destVertexId, messages);
+    if (vertexMessages == null) {
+      vertexMessages = new VertexIdMessageCollection<I, M>(conf);
+      vertexMessages.initialize();
+      partitionMap.put(partitionId, vertexMessages);
     }
+    vertexMessages.add(destVertexId, message);
 
     // Update the number of cached, outgoing messages per worker
     Integer currentWorkerMessageCount = messageCountMap.get(workerInfo);
@@ -114,7 +98,7 @@ public class SendMessageCache<I extends 
       currentWorkerMessageCount = 0;
     }
     final int updatedWorkerMessageCount =
-        currentWorkerMessageCount + messages.size() - originalMessageCount;
+        currentWorkerMessageCount + 1;
     messageCountMap.put(workerInfo, updatedWorkerMessageCount);
     return updatedWorkerMessageCount;
   }
@@ -127,10 +111,10 @@ public class SendMessageCache<I extends 
    * @return Map of all messages (keyed by partition ID's) destined
    *         for vertices hosted by <code>workerInfo</code>
    */
-  public Map<Integer, Map<I, Collection<M>>> removeWorkerMessages(
-    WorkerInfo workerInfo) {
-    Map<Integer, Map<I, Collection<M>>> workerMessages =
-      messageCache.remove(workerInfo);
+  public Map<Integer, VertexIdMessageCollection<I, M>> removeWorkerMessages(
+      WorkerInfo workerInfo) {
+    Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+        messageCache.remove(workerInfo);
     messageCountMap.put(workerInfo, 0);
     return workerMessages;
   }
@@ -141,11 +125,12 @@ public class SendMessageCache<I extends 
    * @return All vertex messages for all partitions
    */
   public Map<WorkerInfo, Map<
-    Integer, Map<I, Collection<M>>>> removeAllMessages() {
-    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
-      allMessages = messageCache;
+      Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
+    Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+        allMessages = messageCache;
     messageCache =
-      new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
+        new HashMap<WorkerInfo,
+            Map<Integer, VertexIdMessageCollection<I, M>>>();
     messageCountMap.clear();
     return allMessages;
   }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java?rev=1403451&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java Mon Oct 29 18:03:51 2012
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Holder for pairs of vertex ids and messages. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class VertexIdMessageCollection<I extends WritableComparable,
+    M extends Writable> implements Writable {
+  /** List of ids of vertices */
+  private List<I> vertexIds;
+  /** List of messages */
+  private List<M> messages;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf;
+
+  /**
+   * Constructor.
+   * Doesn't create the inner lists. If the object is going to be
+   * deserialized lists will be created in {@code readFields()},
+   * otherwise you should call {@code initialize()} before using this object.
+   *
+   * @param conf Giraph configuration
+   */
+  public VertexIdMessageCollection(
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called. If you want to call {@code readFields()} you don't need to call
+   * this method.
+   */
+  public void initialize() {
+    vertexIds = Lists.newArrayList();
+    messages = Lists.newArrayList();
+  }
+
+  /**
+   * Adds message for vertex with selected id.
+   *
+   * @param vertexId Id of vertex
+   * @param message  Message to add
+   */
+  public void add(I vertexId, M message) {
+    vertexIds.add(vertexId);
+    messages.add(message);
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeInt(vertexIds.size());
+    for (int i = 0; i < vertexIds.size(); i++) {
+      vertexIds.get(i).write(dataOutput);
+      messages.get(i).write(dataOutput);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    int messageCount = input.readInt();
+    vertexIds = Lists.newArrayListWithCapacity(messageCount);
+    messages = Lists.newArrayListWithCapacity(messageCount);
+    while (messageCount-- > 0) {
+      I vertexId = conf.createVertexId();
+      vertexId.readFields(input);
+      vertexIds.add(vertexId);
+      M message = conf.createMessageValue();
+      message.readFields(input);
+      messages.add(message);
+    }
+  }
+
+  /**
+   * Get iterator through destination vertices and messages.
+   *
+   * @return {@link Iterator} iterator
+   */
+  public Iterator getIterator() {
+    return new Iterator();
+  }
+
+  /**
+   * Special iterator class which we'll use to iterate through elements of
+   * {@link VertexIdMessageCollection}, without having to create new object as
+   * wrapper for destination vertex id and message.
+   *
+   * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+   * only here next() doesn't return the next object, it just moves along in
+   * the collection. Values related to current pair of (vertex id, message)
+   * can be retrieved by calling getCurrentVertexId() and getCurrentMessage()
+   * methods.
+   *
+   * Not thread-safe.
+   */
+  public class Iterator {
+    /** Current position of the iterator */
+    private int position = -1;
+
+    /**
+     * Returns true if the iteration has more elements.
+     *
+     * @return True if the iteration has more elements.
+     */
+    public boolean hasNext() {
+      return position < messages.size() - 1;
+    }
+
+    /**
+     * Moves to the next element in the iteration.
+     */
+    public void next() {
+      position++;
+    }
+
+    /**
+     * Get vertex id related to current element of the iteration.
+     *
+     * @return Vertex id related to current element of the iteration.
+     */
+    public I getCurrentVertexId() {
+      return vertexIds.get(position);
+    }
+
+    /**
+     * Get message related to current element of the iteration.
+     *
+     * @return Message related to current element of the iteration.
+     */
+    public M getCurrentMessage() {
+      return messages.get(position);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Mon Oct 29 18:03:51 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -100,6 +101,27 @@ public class DiskBackedMessageStoreByPar
   }
 
   @Override
+  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+      int partitionId) throws IOException {
+    Map<I, Collection<M>> map = Maps.newHashMap();
+    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      I vertexId = iterator.getCurrentVertexId();
+      M message = iterator.getCurrentMessage();
+      Collection<M> currentMessages = map.get(vertexId);
+      if (currentMessages == null) {
+        currentMessages = Lists.newArrayList(message);
+        map.put(vertexId, currentMessages);
+      } else {
+        currentMessages.add(message);
+      }
+    }
+    getMessageStore(partitionId).addMessages(map);
+    checkMemory();
+  }
+
+  @Override
   public Collection<M> getVertexMessages(I vertexId) throws IOException {
     if (hasMessagesForVertex(vertexId)) {
       return getMessageStore(vertexId).getVertexMessages(vertexId);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Mon Oct 29 18:03:51 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.messages;
 
+import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -46,6 +47,16 @@ public interface MessageStoreByPartition
       int partitionId) throws IOException;
 
   /**
+   * Adds messages for partition
+   *
+   * @param messages    Collection of vertex ids and messages we want to add
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+      int partitionId) throws IOException;
+
+  /**
    * Gets vertex ids from selected partition which we have messages for
    *
    * @param partitionId Id of partition

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Mon Oct 29 18:03:51 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.messages;
 import com.google.common.collect.MapMaker;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.utils.CollectionUtils;
 import org.apache.hadoop.io.Writable;
@@ -85,17 +86,8 @@ public class SimpleMessageStore<I extend
   public void addVertexMessages(I vertexId,
       Collection<M> messages) throws IOException {
     int partitionId = getPartitionId(vertexId);
-    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
-    if (partitionMap == null) {
-      ConcurrentMap<I, Collection<M>> tmpMap  =
-          new MapMaker().concurrencyLevel(
-              config.getNettyServerExecutionConcurrency()).
-              makeMap();
-      partitionMap = map.putIfAbsent(partitionId, tmpMap);
-      if (partitionMap == null) {
-        partitionMap = map.get(partitionId);
-      }
-    }
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        getOrCreatePartitionMap(partitionId);
     Collection<M> currentMessages =
       CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
     if (combiner != null) {
@@ -117,17 +109,8 @@ public class SimpleMessageStore<I extend
   @Override
   public void addPartitionMessages(Map<I, Collection<M>> messages,
       int partitionId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
-    if (partitionMap == null) {
-      ConcurrentMap<I, Collection<M>> tmpMap  =
-          new MapMaker().concurrencyLevel(
-              config.getNettyServerExecutionConcurrency()).
-              makeMap();
-      partitionMap = map.putIfAbsent(partitionId, tmpMap);
-      if (partitionMap == null) {
-        partitionMap = map.get(partitionId);
-      }
-    }
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        getOrCreatePartitionMap(partitionId);
 
     for (Entry<I, Collection<M>> entry : messages.entrySet()) {
       Collection<M> currentMessages =
@@ -145,6 +128,61 @@ public class SimpleMessageStore<I extend
   }
 
   @Override
+  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+      int partitionId) throws IOException {
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        getOrCreatePartitionMap(partitionId);
+
+    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      I vertexId = iterator.getCurrentVertexId();
+      M message = iterator.getCurrentMessage();
+      Collection<M> currentMessages = partitionMap.get(vertexId);
+      if (currentMessages == null) {
+        Collection<M> newMessages = Lists.newArrayList(message);
+        currentMessages = partitionMap.putIfAbsent(vertexId, newMessages);
+      }
+      // if vertex messages existed before, or putIfAbsent didn't put new list
+      if (currentMessages != null) {
+        synchronized (currentMessages) {
+          currentMessages.add(message);
+          if (combiner != null) {
+            currentMessages =
+                Lists.newArrayList(combiner.combine(vertexId,
+                    currentMessages));
+            partitionMap.put(vertexId, currentMessages);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * If there is already a map of messages related to the partition id
+   * return that map, otherwise create a new one, put it in global map and
+   * return it.
+   *
+   * @param partitionId Id of partition
+   * @return Message map for this partition
+   */
+  private ConcurrentMap<I, Collection<M>> getOrCreatePartitionMap(
+      int partitionId) {
+    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    if (partitionMap == null) {
+      ConcurrentMap<I, Collection<M>> tmpMap =
+          new MapMaker().concurrencyLevel(
+              config.getNettyServerExecutionConcurrency()).
+              makeMap();
+      partitionMap = map.putIfAbsent(partitionId, tmpMap);
+      if (partitionMap == null) {
+        partitionMap = map.get(partitionId);
+      }
+    }
+    return partitionMap;
+  }
+
+  @Override
   public Collection<M> getVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, Collection<M>> partitionMap =
         map.get(getPartitionId(vertexId));

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Mon Oct 29 18:03:51 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.SendMessag
 import org.apache.giraph.comm.SendMutationsCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -133,7 +134,7 @@ public class NettyWorkerClientRequestPro
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageCount >= maxMessagesPerWorker) {
-      Map<Integer, Map<I, Collection<M>>> workerMessages =
+      Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -305,10 +306,10 @@ public class NettyWorkerClientRequestPro
     sendPartitionCache.clear();
 
     // Execute the remaining sends messages (if any)
-    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
+    Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
         remainingMessageCache = sendMessageCache.removeAllMessages();
-    for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
-        remainingMessageCache.entrySet()) {
+    for (Map.Entry<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+        entry : remainingMessageCache.entrySet()) {
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
       doRequest(entry.getKey(), writableRequest);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Mon Oct 29 18:03:51 2012
@@ -19,18 +19,16 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -54,7 +52,8 @@ public class SendWorkerMessagesRequest<I
    * are owned by a single (destination) worker. These messages are all
    * destined for this worker.
    * */
-  private Map<Integer, Map<I, Collection<M>>> partitionVertexMessagesMap;
+  private Map<Integer, VertexIdMessageCollection<I, M>>
+  partitionVertexMessagesMap;
 
   /**
    * Constructor used for reflection only
@@ -64,10 +63,11 @@ public class SendWorkerMessagesRequest<I
   /**
    * Constructor used to send request.
    *
-   * @param partVertMsgsMap Map of remote partitions => vertices => messages
+   * @param partVertMsgsMap Map of remote partitions =>
+   *                        VertexIdMessageCollection
    */
   public SendWorkerMessagesRequest(
-    Map<Integer, Map<I, Collection<M>>> partVertMsgsMap) {
+    Map<Integer, VertexIdMessageCollection<I, M>> partVertMsgsMap) {
     super();
     this.partitionVertexMessagesMap = partVertMsgsMap;
   }
@@ -75,48 +75,25 @@ public class SendWorkerMessagesRequest<I
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
     int numPartitions = input.readInt();
-    partitionVertexMessagesMap = Maps.<Integer, Map<I, Collection<M>>>
-      newHashMapWithExpectedSize(numPartitions);
+    partitionVertexMessagesMap =
+        Maps.<Integer, VertexIdMessageCollection<I, M>>
+            newHashMapWithExpectedSize(numPartitions);
     while (numPartitions-- > 0) {
       final int partitionId = input.readInt();
-      int numVertices = input.readInt();
-      Map<I, Collection<M>> vertexIdMessages =
-        Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
+      VertexIdMessageCollection<I, M> vertexIdMessages =
+          new VertexIdMessageCollection<I, M>(getConf());
+      vertexIdMessages.readFields(input);
       partitionVertexMessagesMap.put(partitionId, vertexIdMessages);
-      while (numVertices-- > 0) {
-        I vertexId = getConf().createVertexId();
-        vertexId.readFields(input);
-        int messageCount = input.readInt();
-        List<M> messageList =
-          Lists.newArrayListWithExpectedSize(messageCount);
-        while (messageCount-- > 0) {
-          M message = getConf().createMessageValue();
-          message.readFields(input);
-          messageList.add(message);
-        }
-        if (vertexIdMessages.put(vertexId, messageList) != null) {
-          throw new IllegalStateException(
-            "readFields: Already has vertex id " + vertexId);
-        }
-      }
     }
   }
 
   @Override
   public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionVertexMessagesMap.size());
-    for (Entry<Integer, Map<I, Collection<M>>> partitionEntry :
+    for (Entry<Integer, VertexIdMessageCollection<I, M>> partitionEntry :
       partitionVertexMessagesMap.entrySet()) {
       output.writeInt(partitionEntry.getKey());
-      output.writeInt(partitionEntry.getValue().size());
-      for (Entry<I, Collection<M>> vertexEntry :
-        partitionEntry.getValue().entrySet()) {
-        vertexEntry.getKey().write(output);
-        output.writeInt(vertexEntry.getValue().size());
-        for (M message : vertexEntry.getValue()) {
-          message.write(output);
-        }
-      }
+      partitionEntry.getValue().write(output);
     }
   }
 
@@ -127,7 +104,7 @@ public class SendWorkerMessagesRequest<I
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    for (Entry<Integer, Map<I, Collection<M>>> entry :
+    for (Entry<Integer, VertexIdMessageCollection<I, M>> entry :
       partitionVertexMessagesMap.entrySet()) {
       try {
         serverData.getIncomingMessageStore()

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Mon Oct 29 18:03:51 2012
@@ -85,18 +85,17 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
-        Maps.newHashMap();
-    Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
+    Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
         Maps.newHashMap();
+    VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
+        new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+    vertexIdMessages.initialize();
     sendMap.put(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
-      Collection<IntWritable> messages = Lists.newArrayList();
       for (int j = 0; j < i; ++j) {
-        messages.add(new IntWritable(j));
+        vertexIdMessages.add(vertexId, new IntWritable(j));
       }
-      vertexIdMessages.put(vertexId, messages);
     }
 
     // Send the request

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Oct 29 18:03:51 2012
@@ -149,19 +149,18 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
+    Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
         Maps.newHashMap();
     int partitionId = 0;
-    Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
-        Maps.newHashMap();
+    VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
+        new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+    vertexIdMessages.initialize();
     sendMap.put(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
-      Collection<IntWritable> messages = Lists.newArrayList();
       for (int j = 0; j < i; ++j) {
-        messages.add(new IntWritable(j));
+        vertexIdMessages.add(vertexId, new IntWritable(j));
       }
-      vertexIdMessages.put(vertexId, messages);
     }
 
     // Send the request