You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/10/10 09:38:15 UTC
svn commit: r1180805 - in /incubator/giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Author: aching
Date: Mon Oct 10 07:38:15 2011
New Revision: 1180805
URL: http://svn.apache.org/viewvc?rev=1180805&view=rev
Log:
GIRAPH-48: numFlushThreads is 0 when doing a single worker
unittest. Changing the minimum to 1. (aching)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1180805&r1=1180804&r2=1180805&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Oct 10 07:38:15 2011
@@ -2,7 +2,10 @@ Giraph Change Log
Release 0.70.0 - unreleased
- GIRAPH-44. Add documentation about counter limits in Hadoop 0.203+.
+ GIRAPH-48: numFlushThreads is 0 when doing a single worker
+ unittest. Changing the minimum to 1. (aching)
+
+ GIRAPH-44: Add documentation about counter limits in Hadoop 0.203+.
(mtiwari via jghoman)
GIRAPH-12: Investigate communication improvements. (hyunsik)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1180805&r1=1180804&r2=1180805&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Mon Oct 10 07:38:15 2011
@@ -93,7 +93,7 @@ public abstract class BasicRPCCommunicat
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
/**
- * Map of the peer connections, mapping from remote socket address to client
+ * Map of the peer connections, mapping from remote socket address to client
* meta data
*/
private final Map<InetSocketAddress, PeerConnection> peerConnections =
@@ -101,7 +101,7 @@ public abstract class BasicRPCCommunicat
/**
* Thread pool for message flush threads
*/
- private final ExecutorService executor;
+ private final ExecutorService executor;
/**
* Map of outbound messages, mapping from remote server to
* destination vertex index to list of messages
@@ -151,9 +151,9 @@ public abstract class BasicRPCCommunicat
private final J jobToken;
/** maximum number of vertices sent in a single RPC */
private static final int MAX_VERTICES_PER_RPC = 1024;
-
+
/**
- * PeerConnection contains RPC client and accumulated messages
+ * PeerConnection contains RPC client and accumulated messages
* for a specific peer.
*/
private class PeerConnection {
@@ -162,7 +162,7 @@ public abstract class BasicRPCCommunicat
* mapping from vertex range (max vertex index) to list of messages.
* (Synchronized with itself).
*/
- private final Map<I, MsgList<M>> outMessagesPerPeer;
+ private final Map<I, MsgList<M>> outMessagesPerPeer;
/**
* Client interface: RPC proxy for remote server, this class for local
*/
@@ -170,16 +170,16 @@ public abstract class BasicRPCCommunicat
/** Maximum size of cached message list, before sending it out */
/** Boolean, set to false when local client (self), true otherwise */
private final boolean isProxy;
-
+
public PeerConnection(Map<I, MsgList<M>> m,
CommunicationsInterface<I, V, E, M> i,
boolean isProxy) {
-
+
this.outMessagesPerPeer = m;
this.peer = i;
this.isProxy = isProxy;
}
-
+
public void close() {
if (LOG.isDebugEnabled()) {
LOG.debug("close: Done");
@@ -190,7 +190,7 @@ public abstract class BasicRPCCommunicat
return peer;
}
}
-
+
private class PeerFlushExecutor implements Runnable {
PeerConnection peerConnection;
@@ -199,7 +199,7 @@ public abstract class BasicRPCCommunicat
}
@Override
- public void run() {
+ public void run() {
CommunicationsInterface<I, V, E, M> proxy
= peerConnection.getRPCProxy();
@@ -223,7 +223,7 @@ public abstract class BasicRPCCommunicat
proxy.getName() +
" putting (list) " + msgList +
" to " + e.getKey() +
- ", proxy = " +
+ ", proxy = " +
peerConnection.isProxy);
}
proxy.putMsgList(e.getKey(), msgList);
@@ -236,7 +236,7 @@ public abstract class BasicRPCCommunicat
+ proxy.getName() +
" putting " + msg +
" to " + e.getKey() +
- ", proxy = " +
+ ", proxy = " +
peerConnection.isProxy);
}
if (msg == null) {
@@ -257,7 +257,7 @@ public abstract class BasicRPCCommunicat
": all messages flushed");
}
} catch (IOException e) {
- LOG.error(e);
+ LOG.error(e);
if (peerConnection.isProxy) {
RPC.stopProxy(peerConnection.peer);
}
@@ -265,7 +265,7 @@ public abstract class BasicRPCCommunicat
}
}
}
-
+
/**
* LargeMessageFlushExecutor flushes all outgoing messages destined to some vertices.
* This is executed when the number of messages destined to certain vertex
@@ -273,7 +273,7 @@ public abstract class BasicRPCCommunicat
*/
private class LargeMessageFlushExecutor implements Runnable {
final I destVertex;
- final MsgList<M> outMessage;
+ final MsgList<M> outMessage;
PeerConnection peerConnection;
LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
@@ -286,11 +286,11 @@ public abstract class BasicRPCCommunicat
}
@Override
- public void run() {
+ public void run() {
try {
- CommunicationsInterface<I, V, E, M> proxy =
+ CommunicationsInterface<I, V, E, M> proxy =
peerConnection.getRPCProxy();
-
+
if (combiner != null) {
M combinedMsg = combiner.combine(destVertex,
outMessage);
@@ -301,7 +301,7 @@ public abstract class BasicRPCCommunicat
proxy.putMsgList(destVertex, outMessage);
}
} catch (IOException e) {
- LOG.error(e);
+ LOG.error(e);
if (peerConnection.isProxy) {
RPC.stopProxy(peerConnection.peer);
}
@@ -311,7 +311,7 @@ public abstract class BasicRPCCommunicat
}
}
}
-
+
private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
PeerConnection pc = peerConnections.get(addr);
executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
@@ -367,20 +367,23 @@ public abstract class BasicRPCCommunicat
this.server.start();
this.myName = myAddress.toString();
-
+
int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
- // if the number of flush threads is unset, it is set to
- // the number of max workers.
- int numFlushThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, numWorkers-1);
+ // If the number of flush threads is unset, it is set to
+ // the number of max workers - 1 or a minimum of 1.
+ int numFlushThreads =
+ Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ numWorkers - 1),
+ 1);
this.executor = Executors.newFixedThreadPool(numFlushThreads);
-
+
if (LOG.isInfoEnabled()) {
LOG.info("BasicRPCCommunications: Started RPC " +
"communication server: " + myName + " with " +
- numHandlers + " handlers and " + numFlushThreads +
+ numHandlers + " handlers and " + numFlushThreads +
" flush threads");
}
-
+
connectAllRPCProxys(this.jobId, this.jobToken);
}
@@ -649,7 +652,7 @@ end[HADOOP_FACEBOOK]*/
InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
CommunicationsInterface<I, V, E, M> rpcProxy =
peerConnections.get(addr).getRPCProxy();
-
+
if (LOG.isInfoEnabled()) {
LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
addr + ", with vertex index " + vertexIndexMax +
@@ -812,7 +815,7 @@ end[HADOOP_FACEBOOK]*/
Collections.shuffle(peerList);
for (PeerConnection pc : peerList) {
- futures.add(executor.submit(new PeerFlushExecutor(pc)));
+ futures.add(executor.submit(new PeerFlushExecutor(pc)));
}
// wait for all flushes