You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/10/10 09:38:15 UTC

svn commit: r1180805 - in /incubator/giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java

Author: aching
Date: Mon Oct 10 07:38:15 2011
New Revision: 1180805

URL: http://svn.apache.org/viewvc?rev=1180805&view=rev
Log:
GIRAPH-48: numFlushThreads is 0 when doing a single worker
unittest. Changing the minimum to 1. (aching)


Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1180805&r1=1180804&r2=1180805&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Oct 10 07:38:15 2011
@@ -2,7 +2,10 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
-  GIRAPH-44. Add documentation about counter limits in Hadoop 0.203+.
+  GIRAPH-48: numFlushThreads is 0 when doing a single worker 
+  unittest. Changing the minimum to 1. (aching)
+
+  GIRAPH-44: Add documentation about counter limits in Hadoop 0.203+.
   (mtiwari via jghoman)
 
   GIRAPH-12: Investigate communication improvements. (hyunsik)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1180805&r1=1180804&r2=1180805&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Mon Oct 10 07:38:15 2011
@@ -93,7 +93,7 @@ public abstract class BasicRPCCommunicat
     /** Messages sent during the last superstep */
     private long totalMsgsSentInSuperstep = 0;
     /**
-     * Map of the peer connections, mapping from remote socket address to client 
+     * Map of the peer connections, mapping from remote socket address to client
      * meta data
      */
     private final Map<InetSocketAddress, PeerConnection> peerConnections =
@@ -101,7 +101,7 @@ public abstract class BasicRPCCommunicat
     /**
      * Thread pool for message flush threads
      */
-    private final ExecutorService executor; 
+    private final ExecutorService executor;
     /**
      * Map of outbound messages, mapping from remote server to
      * destination vertex index to list of messages
@@ -151,9 +151,9 @@ public abstract class BasicRPCCommunicat
     private final J jobToken;
     /** maximum number of vertices sent in a single RPC */
     private static final int MAX_VERTICES_PER_RPC = 1024;
-    
+
     /**
-     * PeerConnection contains RPC client and accumulated messages 
+     * PeerConnection contains RPC client and accumulated messages
      * for a specific peer.
      */
     private class PeerConnection {
@@ -162,7 +162,7 @@ public abstract class BasicRPCCommunicat
          * mapping from vertex range (max vertex index) to list of messages.
          * (Synchronized with itself).
          */
-        private final Map<I, MsgList<M>> outMessagesPerPeer;        
+        private final Map<I, MsgList<M>> outMessagesPerPeer;
         /**
          * Client interface: RPC proxy for remote server, this class for local
          */
@@ -170,16 +170,16 @@ public abstract class BasicRPCCommunicat
         /** Maximum size of cached message list, before sending it out */
         /** Boolean, set to false when local client (self), true otherwise */
         private final boolean isProxy;
-        
+
         public PeerConnection(Map<I, MsgList<M>> m,
             CommunicationsInterface<I, V, E, M> i,
             boolean isProxy) {
-            
+
             this.outMessagesPerPeer = m;
             this.peer = i;
             this.isProxy = isProxy;
         }
-        
+
         public void close() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("close: Done");
@@ -190,7 +190,7 @@ public abstract class BasicRPCCommunicat
             return peer;
         }
     }
-    
+
     private class PeerFlushExecutor implements Runnable {
         PeerConnection peerConnection;
 
@@ -199,7 +199,7 @@ public abstract class BasicRPCCommunicat
         }
 
         @Override
-        public void run() {            
+        public void run() {
             CommunicationsInterface<I, V, E, M> proxy
                 = peerConnection.getRPCProxy();
 
@@ -223,7 +223,7 @@ public abstract class BasicRPCCommunicat
                                             proxy.getName() +
                                             " putting (list) " + msgList +
                                             " to " + e.getKey() +
-                                            ", proxy = " + 
+                                            ", proxy = " +
                                             peerConnection.isProxy);
                                     }
                                     proxy.putMsgList(e.getKey(), msgList);
@@ -236,7 +236,7 @@ public abstract class BasicRPCCommunicat
                                             + proxy.getName() +
                                             " putting " + msg +
                                             " to " + e.getKey() +
-                                            ", proxy = " + 
+                                            ", proxy = " +
                                             peerConnection.isProxy);
                                     }
                                     if (msg == null) {
@@ -257,7 +257,7 @@ public abstract class BasicRPCCommunicat
                         ": all messages flushed");
                 }
             } catch (IOException e) {
-                LOG.error(e);                
+                LOG.error(e);
                 if (peerConnection.isProxy) {
                     RPC.stopProxy(peerConnection.peer);
                 }
@@ -265,7 +265,7 @@ public abstract class BasicRPCCommunicat
             }
         }
     }
-    
+
     /**
      * LargeMessageFlushExecutor flushes all outgoing messages destined to some vertices.
      * This is executed when the number of messages destined to certain vertex
@@ -273,7 +273,7 @@ public abstract class BasicRPCCommunicat
      */
     private class LargeMessageFlushExecutor implements Runnable {
         final I destVertex;
-        final MsgList<M> outMessage;        
+        final MsgList<M> outMessage;
         PeerConnection peerConnection;
 
         LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
@@ -286,11 +286,11 @@ public abstract class BasicRPCCommunicat
         }
 
         @Override
-        public void run() {     
+        public void run() {
             try {
-                CommunicationsInterface<I, V, E, M> proxy = 
+                CommunicationsInterface<I, V, E, M> proxy =
                     peerConnection.getRPCProxy();
-                
+
                 if (combiner != null) {
                         M combinedMsg = combiner.combine(destVertex,
                                                          outMessage);
@@ -301,7 +301,7 @@ public abstract class BasicRPCCommunicat
                         proxy.putMsgList(destVertex, outMessage);
                     }
             } catch (IOException e) {
-                LOG.error(e);                
+                LOG.error(e);
                 if (peerConnection.isProxy) {
                     RPC.stopProxy(peerConnection.peer);
                 }
@@ -311,7 +311,7 @@ public abstract class BasicRPCCommunicat
             }
         }
     }
-    
+
     private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
         PeerConnection pc = peerConnections.get(addr);
         executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
@@ -367,20 +367,23 @@ public abstract class BasicRPCCommunicat
         this.server.start();
 
         this.myName = myAddress.toString();
-        
+
         int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
-        // if the number of flush threads is unset, it is set to 
-        // the number of max workers.
-        int numFlushThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, numWorkers-1);
+        // If the number of flush threads is unset, it is set to
+        // the number of max workers - 1 or a minimum of 1.
+        int numFlushThreads =
+             Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+                                  numWorkers - 1),
+                      1);
         this.executor = Executors.newFixedThreadPool(numFlushThreads);
-        
+
         if (LOG.isInfoEnabled()) {
             LOG.info("BasicRPCCommunications: Started RPC " +
                      "communication server: " + myName + " with " +
-                     numHandlers + " handlers and " + numFlushThreads + 
+                     numHandlers + " handlers and " + numFlushThreads +
                      " flush threads");
         }
-        
+
         connectAllRPCProxys(this.jobId, this.jobToken);
     }
 
@@ -649,7 +652,7 @@ end[HADOOP_FACEBOOK]*/
         InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
         CommunicationsInterface<I, V, E, M> rpcProxy =
             peerConnections.get(addr).getRPCProxy();
-            
+
         if (LOG.isInfoEnabled()) {
             LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
                      addr + ", with vertex index " + vertexIndexMax +
@@ -812,7 +815,7 @@ end[HADOOP_FACEBOOK]*/
         Collections.shuffle(peerList);
 
         for (PeerConnection pc : peerList) {
-            futures.add(executor.submit(new PeerFlushExecutor(pc)));    
+            futures.add(executor.submit(new PeerFlushExecutor(pc)));
         }
 
         // wait for all flushes