You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/29 19:03:52 UTC
svn commit: r1403451 - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/messages/
giraph/src/main/java/org/apache/giraph/comm/netty/
giraph/src/main/java/org/apache/giraph/comm/requests/ gira...
Author: aching
Date: Mon Oct 29 18:03:51 2012
New Revision: 1403451
URL: http://svn.apache.org/viewvc?rev=1403451&view=rev
Log:
GIRAPH-388: Improve the way we keep outgoing messages (majakabiljo
via aching).
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 29 18:03:51 2012
@@ -1,7 +1,11 @@
Giraph Change Log
Release 0.2.0 - unreleased
- GIRAPH-389: Multithreading should intelligently allocate the thread pools. (aching via ereisman)
+ GIRAPH-388: Improve the way we keep outgoing messages (majakabiljo
+ via aching).
+
+ GIRAPH-389: Multithreading should intelligently allocate the thread
+ pools. (aching via ereisman)
GIRAPH-273: Aggregators shouldn't use Zookeeper (majakabiljo)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Mon Oct 29 18:03:51 2012
@@ -18,9 +18,6 @@
package org.apache.giraph.comm;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -30,8 +27,6 @@ import org.apache.giraph.graph.WorkerInf
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.google.common.collect.Lists;
-
/**
* Aggregates the messages to be send to workers so they can be sent
* in bulk. Not thread-safe.
@@ -45,11 +40,14 @@ public class SendMessageCache<I extends
/** Combiner instance, can be null */
private final VertexCombiner<I, M> combiner;
/** Internal cache */
- private Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> messageCache =
- new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
+ private Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+ messageCache =
+ new HashMap<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>();
/** Number of messages in each partition */
private final Map<WorkerInfo, Integer> messageCountMap =
new HashMap<WorkerInfo, Integer>();
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
@@ -57,6 +55,7 @@ public class SendMessageCache<I extends
* @param conf Configuration used for instantiating the combiner.
*/
public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
if (conf.getVertexCombinerClass() == null) {
this.combiner = null;
} else {
@@ -77,36 +76,21 @@ public class SendMessageCache<I extends
public int addMessage(WorkerInfo workerInfo,
final int partitionId, I destVertexId, M message) {
// Get the message collection
- Map<Integer, Map<I, Collection<M>>> partitionMap =
+ Map<Integer, VertexIdMessageCollection<I, M>> partitionMap =
messageCache.get(workerInfo);
if (partitionMap == null) {
- partitionMap = new HashMap<Integer, Map<I, Collection<M>>>();
+ partitionMap = new HashMap<Integer, VertexIdMessageCollection<I, M>>();
messageCache.put(workerInfo, partitionMap);
}
- Map<I, Collection<M>> idMessagesMap = partitionMap.get(partitionId);
-
- if (idMessagesMap == null) {
- idMessagesMap = new HashMap<I, Collection<M>>();
- partitionMap.put(partitionId, idMessagesMap);
- }
- Collection<M> messages = idMessagesMap.get(destVertexId);
- if (messages == null) {
- messages = new ArrayList<M>(1);
- idMessagesMap.put(destVertexId, messages);
- }
+ VertexIdMessageCollection<I, M> vertexMessages =
+ partitionMap.get(partitionId);
- // Add the message
- final int originalMessageCount = messages.size();
- messages.add(message);
- if (combiner != null && originalMessageCount > 0) {
- try {
- messages = Lists.newArrayList(combiner.combine(destVertexId, messages));
- } catch (IOException e) {
- throw new IllegalStateException(
- "addMessage: Combiner failed to combine messages " + messages, e);
- }
- idMessagesMap.put(destVertexId, messages);
+ if (vertexMessages == null) {
+ vertexMessages = new VertexIdMessageCollection<I, M>(conf);
+ vertexMessages.initialize();
+ partitionMap.put(partitionId, vertexMessages);
}
+ vertexMessages.add(destVertexId, message);
// Update the number of cached, outgoing messages per worker
Integer currentWorkerMessageCount = messageCountMap.get(workerInfo);
@@ -114,7 +98,7 @@ public class SendMessageCache<I extends
currentWorkerMessageCount = 0;
}
final int updatedWorkerMessageCount =
- currentWorkerMessageCount + messages.size() - originalMessageCount;
+ currentWorkerMessageCount + 1;
messageCountMap.put(workerInfo, updatedWorkerMessageCount);
return updatedWorkerMessageCount;
}
@@ -127,10 +111,10 @@ public class SendMessageCache<I extends
* @return Map of all messages (keyed by partition ID's) destined
* for vertices hosted by <code>workerInfo</code>
*/
- public Map<Integer, Map<I, Collection<M>>> removeWorkerMessages(
- WorkerInfo workerInfo) {
- Map<Integer, Map<I, Collection<M>>> workerMessages =
- messageCache.remove(workerInfo);
+ public Map<Integer, VertexIdMessageCollection<I, M>> removeWorkerMessages(
+ WorkerInfo workerInfo) {
+ Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ messageCache.remove(workerInfo);
messageCountMap.put(workerInfo, 0);
return workerMessages;
}
@@ -141,11 +125,12 @@ public class SendMessageCache<I extends
* @return All vertex messages for all partitions
*/
public Map<WorkerInfo, Map<
- Integer, Map<I, Collection<M>>>> removeAllMessages() {
- Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
- allMessages = messageCache;
+ Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
+ Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+ allMessages = messageCache;
messageCache =
- new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
+ new HashMap<WorkerInfo,
+ Map<Integer, VertexIdMessageCollection<I, M>>>();
messageCountMap.clear();
return allMessages;
}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java?rev=1403451&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java Mon Oct 29 18:03:51 2012
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Holder for pairs of vertex ids and messages. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class VertexIdMessageCollection<I extends WritableComparable,
+ M extends Writable> implements Writable {
+ /** List of ids of vertices */
+ private List<I> vertexIds;
+ /** List of messages */
+ private List<M> messages;
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf;
+
+ /**
+ * Constructor.
+ * Doesn't create the inner lists. If the object is going to be
+ * deserialized lists will be created in {@code readFields()},
+ * otherwise you should call {@code initialize()} before using this object.
+ *
+ * @param conf Giraph configuration
+ */
+ public VertexIdMessageCollection(
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Initialize the inner state. Must be called before {@code add()} is
+ * called. If you want to call {@code readFields()} you don't need to call
+ * this method.
+ */
+ public void initialize() {
+ vertexIds = Lists.newArrayList();
+ messages = Lists.newArrayList();
+ }
+
+ /**
+ * Adds message for vertex with selected id.
+ *
+ * @param vertexId Id of vertex
+ * @param message Message to add
+ */
+ public void add(I vertexId, M message) {
+ vertexIds.add(vertexId);
+ messages.add(message);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt(vertexIds.size());
+ for (int i = 0; i < vertexIds.size(); i++) {
+ vertexIds.get(i).write(dataOutput);
+ messages.get(i).write(dataOutput);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int messageCount = input.readInt();
+ vertexIds = Lists.newArrayListWithCapacity(messageCount);
+ messages = Lists.newArrayListWithCapacity(messageCount);
+ while (messageCount-- > 0) {
+ I vertexId = conf.createVertexId();
+ vertexId.readFields(input);
+ vertexIds.add(vertexId);
+ M message = conf.createMessageValue();
+ message.readFields(input);
+ messages.add(message);
+ }
+ }
+
+ /**
+ * Get iterator through destination vertices and messages.
+ *
+ * @return {@link Iterator} iterator
+ */
+ public Iterator getIterator() {
+ return new Iterator();
+ }
+
+ /**
+ * Special iterator class which we'll use to iterate through elements of
+ * {@link VertexIdMessageCollection}, without having to create new object as
+ * wrapper for destination vertex id and message.
+ *
+ * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+ * only here next() doesn't return the next object, it just moves along in
+ * the collection. Values related to current pair of (vertex id, message)
+ * can be retrieved by calling getCurrentVertexId() and getCurrentMessage()
+ * methods.
+ *
+ * Not thread-safe.
+ */
+ public class Iterator {
+ /** Current position of the iterator */
+ private int position = -1;
+
+ /**
+ * Returns true if the iteration has more elements.
+ *
+ * @return True if the iteration has more elements.
+ */
+ public boolean hasNext() {
+ return position < messages.size() - 1;
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ public void next() {
+ position++;
+ }
+
+ /**
+ * Get vertex id related to current element of the iteration.
+ *
+ * @return Vertex id related to current element of the iteration.
+ */
+ public I getCurrentVertexId() {
+ return vertexIds.get(position);
+ }
+
+ /**
+ * Get message related to current element of the iteration.
+ *
+ * @return Message related to current element of the iteration.
+ */
+ public M getCurrentMessage() {
+ return messages.get(position);
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Mon Oct 29 18:03:51 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.messages;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -100,6 +101,27 @@ public class DiskBackedMessageStoreByPar
}
@Override
+ public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ int partitionId) throws IOException {
+ Map<I, Collection<M>> map = Maps.newHashMap();
+ VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ I vertexId = iterator.getCurrentVertexId();
+ M message = iterator.getCurrentMessage();
+ Collection<M> currentMessages = map.get(vertexId);
+ if (currentMessages == null) {
+ currentMessages = Lists.newArrayList(message);
+ map.put(vertexId, currentMessages);
+ } else {
+ currentMessages.add(message);
+ }
+ }
+ getMessageStore(partitionId).addMessages(map);
+ checkMemory();
+ }
+
+ @Override
public Collection<M> getVertexMessages(I vertexId) throws IOException {
if (hasMessagesForVertex(vertexId)) {
return getMessageStore(vertexId).getVertexMessages(vertexId);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Mon Oct 29 18:03:51 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.messages;
+import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -46,6 +47,16 @@ public interface MessageStoreByPartition
int partitionId) throws IOException;
/**
+ * Adds messages for partition
+ *
+ * @param messages Collection of vertex ids and messages we want to add
+ * @param partitionId Id of partition
+ * @throws IOException
+ */
+ void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ int partitionId) throws IOException;
+
+ /**
* Gets vertex ids from selected partition which we have messages for
*
* @param partitionId Id of partition
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Mon Oct 29 18:03:51 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.messages;
import com.google.common.collect.MapMaker;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.utils.CollectionUtils;
import org.apache.hadoop.io.Writable;
@@ -85,17 +86,8 @@ public class SimpleMessageStore<I extend
public void addVertexMessages(I vertexId,
Collection<M> messages) throws IOException {
int partitionId = getPartitionId(vertexId);
- ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
- if (partitionMap == null) {
- ConcurrentMap<I, Collection<M>> tmpMap =
- new MapMaker().concurrencyLevel(
- config.getNettyServerExecutionConcurrency()).
- makeMap();
- partitionMap = map.putIfAbsent(partitionId, tmpMap);
- if (partitionMap == null) {
- partitionMap = map.get(partitionId);
- }
- }
+ ConcurrentMap<I, Collection<M>> partitionMap =
+ getOrCreatePartitionMap(partitionId);
Collection<M> currentMessages =
CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
if (combiner != null) {
@@ -117,17 +109,8 @@ public class SimpleMessageStore<I extend
@Override
public void addPartitionMessages(Map<I, Collection<M>> messages,
int partitionId) throws IOException {
- ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
- if (partitionMap == null) {
- ConcurrentMap<I, Collection<M>> tmpMap =
- new MapMaker().concurrencyLevel(
- config.getNettyServerExecutionConcurrency()).
- makeMap();
- partitionMap = map.putIfAbsent(partitionId, tmpMap);
- if (partitionMap == null) {
- partitionMap = map.get(partitionId);
- }
- }
+ ConcurrentMap<I, Collection<M>> partitionMap =
+ getOrCreatePartitionMap(partitionId);
for (Entry<I, Collection<M>> entry : messages.entrySet()) {
Collection<M> currentMessages =
@@ -145,6 +128,61 @@ public class SimpleMessageStore<I extend
}
@Override
+ public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ int partitionId) throws IOException {
+ ConcurrentMap<I, Collection<M>> partitionMap =
+ getOrCreatePartitionMap(partitionId);
+
+ VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ I vertexId = iterator.getCurrentVertexId();
+ M message = iterator.getCurrentMessage();
+ Collection<M> currentMessages = partitionMap.get(vertexId);
+ if (currentMessages == null) {
+ Collection<M> newMessages = Lists.newArrayList(message);
+ currentMessages = partitionMap.putIfAbsent(vertexId, newMessages);
+ }
+ // if vertex messages existed before, or putIfAbsent didn't put new list
+ if (currentMessages != null) {
+ synchronized (currentMessages) {
+ currentMessages.add(message);
+ if (combiner != null) {
+ currentMessages =
+ Lists.newArrayList(combiner.combine(vertexId,
+ currentMessages));
+ partitionMap.put(vertexId, currentMessages);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * If there is already a map of messages related to the partition id
+ * return that map, otherwise create a new one, put it in global map and
+ * return it.
+ *
+ * @param partitionId Id of partition
+ * @return Message map for this partition
+ */
+ private ConcurrentMap<I, Collection<M>> getOrCreatePartitionMap(
+ int partitionId) {
+ ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+ if (partitionMap == null) {
+ ConcurrentMap<I, Collection<M>> tmpMap =
+ new MapMaker().concurrencyLevel(
+ config.getNettyServerExecutionConcurrency()).
+ makeMap();
+ partitionMap = map.putIfAbsent(partitionId, tmpMap);
+ if (partitionMap == null) {
+ partitionMap = map.get(partitionId);
+ }
+ }
+ return partitionMap;
+ }
+
+ @Override
public Collection<M> getVertexMessages(I vertexId) throws IOException {
ConcurrentMap<I, Collection<M>> partitionMap =
map.get(getPartitionId(vertexId));
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Mon Oct 29 18:03:51 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.SendMessag
import org.apache.giraph.comm.SendMutationsCache;
import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -133,7 +134,7 @@ public class NettyWorkerClientRequestPro
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageCount >= maxMessagesPerWorker) {
- Map<Integer, Map<I, Collection<M>>> workerMessages =
+ Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -305,10 +306,10 @@ public class NettyWorkerClientRequestPro
sendPartitionCache.clear();
// Execute the remaining sends messages (if any)
- Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
+ Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
remainingMessageCache = sendMessageCache.removeAllMessages();
- for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
- remainingMessageCache.entrySet()) {
+ for (Map.Entry<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+ entry : remainingMessageCache.entrySet()) {
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
doRequest(entry.getKey(), writableRequest);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Mon Oct 29 18:03:51 2012
@@ -19,18 +19,16 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -54,7 +52,8 @@ public class SendWorkerMessagesRequest<I
* are owned by a single (destination) worker. These messages are all
* destined for this worker.
* */
- private Map<Integer, Map<I, Collection<M>>> partitionVertexMessagesMap;
+ private Map<Integer, VertexIdMessageCollection<I, M>>
+ partitionVertexMessagesMap;
/**
* Constructor used for reflection only
@@ -64,10 +63,11 @@ public class SendWorkerMessagesRequest<I
/**
* Constructor used to send request.
*
- * @param partVertMsgsMap Map of remote partitions => vertices => messages
+ * @param partVertMsgsMap Map of remote partitions =>
+ * VertexIdMessageCollection
*/
public SendWorkerMessagesRequest(
- Map<Integer, Map<I, Collection<M>>> partVertMsgsMap) {
+ Map<Integer, VertexIdMessageCollection<I, M>> partVertMsgsMap) {
super();
this.partitionVertexMessagesMap = partVertMsgsMap;
}
@@ -75,48 +75,25 @@ public class SendWorkerMessagesRequest<I
@Override
public void readFieldsRequest(DataInput input) throws IOException {
int numPartitions = input.readInt();
- partitionVertexMessagesMap = Maps.<Integer, Map<I, Collection<M>>>
- newHashMapWithExpectedSize(numPartitions);
+ partitionVertexMessagesMap =
+ Maps.<Integer, VertexIdMessageCollection<I, M>>
+ newHashMapWithExpectedSize(numPartitions);
while (numPartitions-- > 0) {
final int partitionId = input.readInt();
- int numVertices = input.readInt();
- Map<I, Collection<M>> vertexIdMessages =
- Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
+ VertexIdMessageCollection<I, M> vertexIdMessages =
+ new VertexIdMessageCollection<I, M>(getConf());
+ vertexIdMessages.readFields(input);
partitionVertexMessagesMap.put(partitionId, vertexIdMessages);
- while (numVertices-- > 0) {
- I vertexId = getConf().createVertexId();
- vertexId.readFields(input);
- int messageCount = input.readInt();
- List<M> messageList =
- Lists.newArrayListWithExpectedSize(messageCount);
- while (messageCount-- > 0) {
- M message = getConf().createMessageValue();
- message.readFields(input);
- messageList.add(message);
- }
- if (vertexIdMessages.put(vertexId, messageList) != null) {
- throw new IllegalStateException(
- "readFields: Already has vertex id " + vertexId);
- }
- }
}
}
@Override
public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionVertexMessagesMap.size());
- for (Entry<Integer, Map<I, Collection<M>>> partitionEntry :
+ for (Entry<Integer, VertexIdMessageCollection<I, M>> partitionEntry :
partitionVertexMessagesMap.entrySet()) {
output.writeInt(partitionEntry.getKey());
- output.writeInt(partitionEntry.getValue().size());
- for (Entry<I, Collection<M>> vertexEntry :
- partitionEntry.getValue().entrySet()) {
- vertexEntry.getKey().write(output);
- output.writeInt(vertexEntry.getValue().size());
- for (M message : vertexEntry.getValue()) {
- message.write(output);
- }
- }
+ partitionEntry.getValue().write(output);
}
}
@@ -127,7 +104,7 @@ public class SendWorkerMessagesRequest<I
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- for (Entry<Integer, Map<I, Collection<M>>> entry :
+ for (Entry<Integer, VertexIdMessageCollection<I, M>> entry :
partitionVertexMessagesMap.entrySet()) {
try {
serverData.getIncomingMessageStore()
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Mon Oct 29 18:03:51 2012
@@ -85,18 +85,17 @@ public class RequestFailureTest {
private WritableRequest getRequest() {
// Data to send
final int partitionId = 0;
- Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
- Maps.newHashMap();
- Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
+ Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
Maps.newHashMap();
+ VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
+ new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+ vertexIdMessages.initialize();
sendMap.put(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {
IntWritable vertexId = new IntWritable(i);
- Collection<IntWritable> messages = Lists.newArrayList();
for (int j = 0; j < i; ++j) {
- messages.add(new IntWritable(j));
+ vertexIdMessages.add(vertexId, new IntWritable(j));
}
- vertexIdMessages.put(vertexId, messages);
}
// Send the request
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1403451&r1=1403450&r2=1403451&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Oct 29 18:03:51 2012
@@ -149,19 +149,18 @@ public class RequestTest {
@Test
public void sendWorkerMessagesRequest() throws IOException {
// Data to send
- Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
+ Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
Maps.newHashMap();
int partitionId = 0;
- Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
- Maps.newHashMap();
+ VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
+ new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+ vertexIdMessages.initialize();
sendMap.put(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {
IntWritable vertexId = new IntWritable(i);
- Collection<IntWritable> messages = Lists.newArrayList();
for (int j = 0; j < i; ++j) {
- messages.add(new IntWritable(j));
+ vertexIdMessages.add(vertexId, new IntWritable(j));
}
- vertexIdMessages.put(vertexId, messages);
}
// Send the request