You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:51:40 UTC

svn commit: r1390018 - /giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java

Author: aching
Date: Tue Sep 25 17:51:40 2012
New Revision: 1390018

URL: http://svn.apache.org/viewvc?rev=1390018&view=rev
Log:
GIRAPH-339: Optimization to SimpleMessageStore#addPartitionMessages to
avoid getting the partition map for every vertex.

Modified:
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1390018&r1=1390017&r2=1390018&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Sep 25 17:51:40 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.messages;
 
+import com.google.common.collect.MapMaker;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.VertexCombiner;
@@ -74,7 +75,8 @@ public class SimpleMessageStore<I extend
       VertexCombiner<I, M> combiner,
       ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
     this.service = service;
-    map = Maps.newConcurrentMap();
+    map = new MapMaker().concurrencyLevel(
+        config.getNettyServerExecutionConcurrency()).makeMap();
     this.combiner = combiner;
     this.config = config;
   }
@@ -85,8 +87,11 @@ public class SimpleMessageStore<I extend
     int partitionId = getPartitonId(vertexId);
     ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
     if (partitionMap == null) {
-      partitionMap = map.putIfAbsent(partitionId,
-          Maps.<I, Collection<M>>newConcurrentMap());
+      ConcurrentMap<I, Collection<M>> tmpMap  =
+          new MapMaker().concurrencyLevel(
+              config.getNettyServerExecutionConcurrency()).
+              makeMap();
+      partitionMap = map.putIfAbsent(partitionId, tmpMap);
       if (partitionMap == null) {
         partitionMap = map.get(partitionId);
       }
@@ -113,7 +118,31 @@ public class SimpleMessageStore<I extend
   @Override
   public void addPartitionMessages(Map<I, Collection<M>> messages,
       int partitionId) throws IOException {
-    addMessages(messages);
+    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    if (partitionMap == null) {
+      ConcurrentMap<I, Collection<M>> tmpMap  =
+          new MapMaker().concurrencyLevel(
+              config.getNettyServerExecutionConcurrency()).
+              makeMap();
+      partitionMap = map.putIfAbsent(partitionId, tmpMap);
+      if (partitionMap == null) {
+        partitionMap = map.get(partitionId);
+      }
+    }
+
+    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+      Collection<M> currentMessages =
+          CollectionUtils.addConcurrent(
+              entry.getKey(), entry.getValue(), partitionMap);
+      if (combiner != null) {
+        synchronized (currentMessages) {
+          currentMessages =
+              Lists.newArrayList(combiner.combine(entry.getKey(),
+                  currentMessages));
+          partitionMap.put(entry.getKey(), currentMessages);
+        }
+      }
+    }
   }
 
   @Override