You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:51:40 UTC
svn commit: r1390018 -
/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
Author: aching
Date: Tue Sep 25 17:51:40 2012
New Revision: 1390018
URL: http://svn.apache.org/viewvc?rev=1390018&view=rev
Log:
GIRAPH-339: Optimization to SimpleMessageStore#addPartitionMessages to
avoid getting the partition map for every vertex.
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1390018&r1=1390017&r2=1390018&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Sep 25 17:51:40 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.messages;
+import com.google.common.collect.MapMaker;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.VertexCombiner;
@@ -74,7 +75,8 @@ public class SimpleMessageStore<I extend
VertexCombiner<I, M> combiner,
ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
this.service = service;
- map = Maps.newConcurrentMap();
+ map = new MapMaker().concurrencyLevel(
+ config.getNettyServerExecutionConcurrency()).makeMap();
this.combiner = combiner;
this.config = config;
}
@@ -85,8 +87,11 @@ public class SimpleMessageStore<I extend
int partitionId = getPartitonId(vertexId);
ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
if (partitionMap == null) {
- partitionMap = map.putIfAbsent(partitionId,
- Maps.<I, Collection<M>>newConcurrentMap());
+ ConcurrentMap<I, Collection<M>> tmpMap =
+ new MapMaker().concurrencyLevel(
+ config.getNettyServerExecutionConcurrency()).
+ makeMap();
+ partitionMap = map.putIfAbsent(partitionId, tmpMap);
if (partitionMap == null) {
partitionMap = map.get(partitionId);
}
@@ -113,7 +118,31 @@ public class SimpleMessageStore<I extend
@Override
public void addPartitionMessages(Map<I, Collection<M>> messages,
int partitionId) throws IOException {
- addMessages(messages);
+ ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+ if (partitionMap == null) {
+ ConcurrentMap<I, Collection<M>> tmpMap =
+ new MapMaker().concurrencyLevel(
+ config.getNettyServerExecutionConcurrency()).
+ makeMap();
+ partitionMap = map.putIfAbsent(partitionId, tmpMap);
+ if (partitionMap == null) {
+ partitionMap = map.get(partitionId);
+ }
+ }
+
+ for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+ Collection<M> currentMessages =
+ CollectionUtils.addConcurrent(
+ entry.getKey(), entry.getValue(), partitionMap);
+ if (combiner != null) {
+ synchronized (currentMessages) {
+ currentMessages =
+ Lists.newArrayList(combiner.combine(entry.getKey(),
+ currentMessages));
+ partitionMap.put(entry.getKey(), currentMessages);
+ }
+ }
+ }
}
@Override