You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [3/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Feb 16 22:12:31 2012
@@ -39,7 +39,6 @@ import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -66,555 +65,623 @@ import com.google.common.collect.Iterabl
import org.apache.hadoop.ipc.ProtocolSignature;
end[HADOOP_FACEBOOK]*/
+/**
+ * Basic RPC communications object that implements the lower level operations
+ * for RPC communication.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <J> Job token
+ */
@SuppressWarnings("rawtypes")
-public abstract class BasicRPCCommunications<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable, J>
- implements CommunicationsInterface<I, V, E, M>,
- ServerInterface<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(BasicRPCCommunications.class);
- /** Indicates whether in superstep preparation */
- private boolean inPrepareSuperstep = false;
- /** Local hostname */
- private final String localHostname;
- /** Name of RPC server, == myAddress.toString() */
- private final String myName;
- /** RPC server */
- private Server server;
- /** Centralized service, needed to get vertex ranges */
- private final CentralizedServiceWorker<I, V, E, M> service;
- /** Hadoop configuration */
- protected final Configuration conf;
- /** Combiner instance, can be null */
- private final VertexCombiner<I, M> combiner;
- /** Address of RPC server */
- private InetSocketAddress myAddress;
- /** Messages sent during the last superstep */
- private long totalMsgsSentInSuperstep = 0;
- /** Maximum messages sent per putVertexIdMessagesList RPC */
- private final int maxMessagesPerFlushPut;
+public abstract class BasicRPCCommunications<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable, J>
+ implements CommunicationsInterface<I, V, E, M>,
+ ServerInterface<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(BasicRPCCommunications.class);
+ /** Maximum number of vertices sent in a single RPC */
+ private static final int MAX_VERTICES_PER_RPC = 1024;
+ /** Hadoop configuration */
+ protected final Configuration conf;
+ /** Indicates whether in superstep preparation */
+ private boolean inPrepareSuperstep = false;
+ /** Local hostname */
+ private final String localHostname;
+ /** Name of RPC server, == myAddress.toString() */
+ private final String myName;
+ /** RPC server */
+ private Server server;
+ /** Centralized service, needed to get vertex ranges */
+ private final CentralizedServiceWorker<I, V, E, M> service;
+ /** Combiner instance, can be null */
+ private final VertexCombiner<I, M> combiner;
+ /** Address of RPC server */
+ private InetSocketAddress myAddress;
+ /** Messages sent during the last superstep */
+ private long totalMsgsSentInSuperstep = 0;
+ /** Maximum messages sent per putVertexIdMessagesList RPC */
+ private final int maxMessagesPerFlushPut;
+ /**
+ * Map of the peer connections, mapping from remote socket address to client
+ * meta data
+ */
+ private final Map<InetSocketAddress, PeerConnection> peerConnections =
+ new HashMap<InetSocketAddress, PeerConnection>();
+ /**
+ * Cached map of partition ids to remote socket address. Needs to be
+ * synchronized.
+ */
+ private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
+ new HashMap<Integer, InetSocketAddress>();
+ /**
+ * Thread pool for message flush threads
+ */
+ private final ExecutorService executor;
+ /**
+ * Map of outbound messages, mapping from remote server to
+ * destination vertex index to list of messages
+ * (Synchronized between peer threads and main thread for each internal
+ * map)
+ */
+ private final Map<InetSocketAddress, Map<I, MsgList<M>>> outMessages =
+ new HashMap<InetSocketAddress, Map<I, MsgList<M>>>();
+ /**
+ * Map of incoming messages, mapping from vertex index to list of messages.
+ * Only accessed by the main thread (no need to synchronize).
+ */
+ private final Map<I, List<M>> inMessages = new HashMap<I, List<M>>();
+ /**
+ * Map of inbound messages, mapping from vertex index to list of messages.
+ * Transferred to inMessages at beginning of a superstep. This
+ * intermediary step exists so that the combiner will run not only at the
+ * client, but also at the server. Also, allows the sending of large
+ * message lists during the superstep computation. (Synchronized)
+ */
+ private final Map<I, List<M>> transientInMessages =
+ new HashMap<I, List<M>>();
+ /**
+ * Map of partition ids to incoming vertices from other workers.
+ * (Synchronized)
+ */
+ private final Map<Integer, List<BasicVertex<I, V, E, M>>>
+ inPartitionVertexMap = new HashMap<Integer, List<BasicVertex<I, V, E, M>>>();
+
+ /**
+ * Map from vertex index to all vertex mutations
+ */
+ private final Map<I, VertexMutations<I, V, E, M>> inVertexMutationsMap =
+ new HashMap<I, VertexMutations<I, V, E, M>>();
+
+ /** Maximum size of cached message list, before sending it out */
+ private final int maxSize;
+ /** Cached job id */
+ private final String jobId;
+ /** Cached job token */
+ private final J jobToken;
+
+
+ /**
+ * PeerConnection contains RPC client and accumulated messages
+ * for a specific peer.
+ */
+ private class PeerConnection {
/**
- * Map of the peer connections, mapping from remote socket address to client
- * meta data
+ * Map of outbound messages going to a particular remote server,
+ * mapping from the destination vertex to a list of messages.
+ * (Synchronized with itself).
*/
- private final Map<InetSocketAddress, PeerConnection> peerConnections =
- new HashMap<InetSocketAddress, PeerConnection>();
+ private final Map<I, MsgList<M>> outMessagesPerPeer;
/**
- * Cached map of partition ids to remote socket address. Needs to be
- * synchronized.
+ * Client interface: RPC proxy for remote server, this class for local
*/
- private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
- new HashMap<Integer, InetSocketAddress>();
- /**
- * Thread pool for message flush threads
- */
- private final ExecutorService executor;
- /**
- * Map of outbound messages, mapping from remote server to
- * destination vertex index to list of messages
- * (Synchronized between peer threads and main thread for each internal
- * map)
- */
- private final Map<InetSocketAddress, Map<I, MsgList<M>>> outMessages =
- new HashMap<InetSocketAddress, Map<I, MsgList<M>>>();
- /**
- * Map of incoming messages, mapping from vertex index to list of messages.
- * Only accessed by the main thread (no need to synchronize).
- */
- private final Map<I, List<M>> inMessages = new HashMap<I, List<M>>();
- /**
- * Map of inbound messages, mapping from vertex index to list of messages.
- * Transferred to inMessages at beginning of a superstep. This
- * intermediary step exists so that the combiner will run not only at the
- * client, but also at the server. Also, allows the sending of large
- * message lists during the superstep computation. (Synchronized)
- */
- private final Map<I, List<M>> transientInMessages =
- new HashMap<I, List<M>>();
- /**
- * Map of partition ids to incoming vertices from other workers.
- * (Synchronized)
- */
- private final Map<Integer, List<BasicVertex<I, V, E, M>>>
- inPartitionVertexMap =
- new HashMap<Integer, List<BasicVertex<I, V, E, M>>>();
-
- /**
- * Map from vertex index to all vertex mutations
- */
- private final Map<I, VertexMutations<I, V, E, M>>
- inVertexMutationsMap =
- new HashMap<I, VertexMutations<I, V, E, M>>();
-
- /** Maximum size of cached message list, before sending it out */
- private final int maxSize;
- /** Cached job id */
- private final String jobId;
- /** Cached job token */
- private final J jobToken;
- /** Maximum number of vertices sent in a single RPC */
- private static final int MAX_VERTICES_PER_RPC = 1024;
+ private final CommunicationsInterface<I, V, E, M> peer;
+ /** Boolean, set to false when local client (self), true otherwise */
+ private final boolean isProxy;
/**
- * PeerConnection contains RPC client and accumulated messages
- * for a specific peer.
+ * Constructor
+ * @param idMessageMap Map of vertex id to message list
+ * @param peerConnection Peer connection
+ * @param isProxy Is this a proxy (true) or local (false)?
*/
- private class PeerConnection {
- /**
- * Map of outbound messages going to a particular remote server,
- * mapping from the destination vertex to a list of messages.
- * (Synchronized with itself).
- */
- private final Map<I, MsgList<M>> outMessagesPerPeer;
- /**
- * Client interface: RPC proxy for remote server, this class for local
- */
- private final CommunicationsInterface<I, V, E, M> peer;
- /** Boolean, set to false when local client (self), true otherwise */
- private final boolean isProxy;
-
- public PeerConnection(Map<I, MsgList<M>> m,
- CommunicationsInterface<I, V, E, M> i,
- boolean isProxy) {
-
- this.outMessagesPerPeer = m;
- this.peer = i;
- this.isProxy = isProxy;
- }
-
- public void close() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("close: Done");
- }
- }
-
- public CommunicationsInterface<I, V, E, M> getRPCProxy() {
- return peer;
- }
-
- @Override
- public String toString() {
- return peer.getName() + ", proxy=" + isProxy;
- }
- }
-
- private class PeerFlushExecutor implements Runnable {
- private final PeerConnection peerConnection;
- private final Mapper<?, ?, ?, ?>.Context context;
- // Report on the status of this flusher if this interval was exceeded
- private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000;
-
- PeerFlushExecutor(PeerConnection peerConnection,
- Mapper<?, ?, ?, ?>.Context context) {
- this.peerConnection = peerConnection;
- this.context = context;
- }
-
- @Override
- public void run() {
- CommunicationsInterface<I, V, E, M> proxy
- = peerConnection.getRPCProxy();
- long startMillis = System.currentTimeMillis();
- long lastReportedMillis = startMillis;
- try {
- int verticesDone = 0;
- synchronized(peerConnection.outMessagesPerPeer) {
- final int vertices =
- peerConnection.outMessagesPerPeer.size();
- // 1. Check for null messages and combine if possible
- // 2. Send vertex ids and messages in bulk to the
- // destination servers.
- for (Entry<I, MsgList<M>> entry :
- peerConnection.outMessagesPerPeer.entrySet()) {
- for (M msg : entry.getValue()) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "run: Cannot put null message on " +
- "vertex id " + entry.getKey());
- }
- }
- if (combiner != null && entry.getValue().size() > 1) {
- Iterable<M> messages = combiner.combine(
- entry.getKey(), entry.getValue());
- if (messages == null) {
- throw new IllegalStateException(
- "run: Combiner cannot return null");
- }
- if (Iterables.size(entry.getValue()) <
- Iterables.size(messages)) {
- throw new IllegalStateException(
- "run: The number of combined " +
- "messages is required to be <= to " +
- "number of messages to be combined");
- }
- entry.getValue().clear();
- for (M msg: messages) {
- entry.getValue().add(msg);
- }
- }
- if (entry.getValue().isEmpty()) {
- throw new IllegalStateException(
- "run: Impossible for no messages in " +
- entry.getKey());
- }
- }
- while (!peerConnection.outMessagesPerPeer.isEmpty()) {
- int bulkedMessages = 0;
- Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt =
- peerConnection.outMessagesPerPeer.entrySet().
- iterator();
- VertexIdMessagesList<I, M> vertexIdMessagesList =
- new VertexIdMessagesList<I, M>();
- while (vertexIdMessagesListIt.hasNext()) {
- Entry<I, MsgList<M>> entry =
- vertexIdMessagesListIt.next();
- // Add this entry if the list is empty or we
- // haven't reached the maximum number of messages
- if (vertexIdMessagesList.isEmpty() ||
- ((bulkedMessages + entry.getValue().size())
- < maxMessagesPerFlushPut)) {
- vertexIdMessagesList.add(
- new VertexIdMessages<I, M>(
- entry.getKey(), entry.getValue()));
- bulkedMessages += entry.getValue().size();
- }
- }
-
- // Clean up references to the vertex id and messages
- for (VertexIdMessages<I, M>vertexIdMessages :
- vertexIdMessagesList) {
- peerConnection.outMessagesPerPeer.remove(
- vertexIdMessages.getVertexId());
- }
-
- proxy.putVertexIdMessagesList(vertexIdMessagesList);
- context.progress();
-
- verticesDone += vertexIdMessagesList.size();
- long curMillis = System.currentTimeMillis();
- if ((lastReportedMillis +
- REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
- lastReportedMillis = curMillis;
- if (LOG.isInfoEnabled()) {
- float percentDone =
- (100f * verticesDone) /
- vertices;
- float minutesUsed =
- (curMillis - startMillis) / 1000f / 60f;
- float minutesRemaining =
- (minutesUsed * 100f / percentDone) -
- minutesUsed;
- LOG.info("run: " + peerConnection + ", " +
- verticesDone + " out of " +
- vertices +
- " done in " + minutesUsed +
- " minutes, " +
- percentDone + "% done, ETA " +
- minutesRemaining +
- " minutes remaining, " +
- MemoryUtils.getRuntimeMemoryStats());
- }
- }
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("run: " + proxy.getName() +
- ": all messages flushed");
- }
- } catch (IOException e) {
- LOG.error(e);
- if (peerConnection.isProxy) {
- RPC.stopProxy(peerConnection.peer);
- }
- throw new RuntimeException(e);
- }
- }
+ public PeerConnection(Map<I, MsgList<M>> idMessageMap,
+ CommunicationsInterface<I, V, E, M> peerConnection,
+ boolean isProxy) {
+
+ this.outMessagesPerPeer = idMessageMap;
+ this.peer = peerConnection;
+ this.isProxy = isProxy;
}
/**
- * LargeMessageFlushExecutor flushes all outgoing messages destined to some vertices.
- * This is executed when the number of messages destined to certain vertex
- * exceeds <i>maxSize</i>.
+ * Nothing to do here to cleanup, just notify.
*/
- private class LargeMessageFlushExecutor implements Runnable {
- private final I destVertex;
- private final MsgList<M> outMessageList;
- private PeerConnection peerConnection;
-
- LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
- this.peerConnection = peerConnection;
- synchronized(peerConnection.outMessagesPerPeer) {
- this.destVertex = destVertex;
- outMessageList =
- peerConnection.outMessagesPerPeer.get(destVertex);
- peerConnection.outMessagesPerPeer.remove(destVertex);
- }
- }
-
- @Override
- public void run() {
- try {
- CommunicationsInterface<I, V, E, M> proxy =
- peerConnection.getRPCProxy();
-
- if (combiner != null) {
- Iterable<M> messages = combiner.combine(destVertex,
- outMessageList);
- if (messages == null) {
- throw new IllegalStateException(
- "run: Combiner cannot return null");
- }
- if (Iterables.size(outMessageList) <
- Iterables.size(messages)) {
- throw new IllegalStateException(
- "run: The number of combined messages is " +
- "required to be <= to the number of " +
- "messages to be combined");
- }
- for (M msg: messages) {
- proxy.putMsg(destVertex, msg);
- }
- } else {
- proxy.putMsgList(destVertex, outMessageList);
- }
- } catch (IOException e) {
- LOG.error(e);
- if (peerConnection.isProxy) {
- RPC.stopProxy(peerConnection.peer);
- }
- throw new RuntimeException("run: Got IOException", e);
- } finally {
- outMessageList.clear();
- }
- }
- }
-
- private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
- PeerConnection pc = peerConnections.get(addr);
- executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
+ public void close() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("close: Done");
+ }
}
- protected abstract J createJobToken() throws IOException;
-
- protected abstract Server getRPCServer(
- InetSocketAddress addr,
- int numHandlers, String jobId, J jobToken) throws IOException;
-
/**
- * Only constructor.
+ * Get the RPC proxy of this connection.
*
- * @param context Context for getting configuration
- * @param service Service worker to get the vertex ranges
- * @throws IOException
- * @throws UnknownHostException
- * @throws InterruptedException
+ * @return RPC proxy of this connection.
*/
- public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service)
- throws IOException, UnknownHostException, InterruptedException {
- this.service = service;
- this.conf = context.getConfiguration();
- this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
- GiraphJob.MSG_SIZE_DEFAULT);
- this.maxMessagesPerFlushPut =
- conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
- GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
- if (BspUtils.getVertexCombinerClass(conf) == null) {
- this.combiner = null;
- } else {
- this.combiner = BspUtils.createVertexCombiner(conf);
- }
-
- this.localHostname = InetAddress.getLocalHost().getHostName();
- int taskId = conf.getInt("mapred.task.partition", -1);
- int numTasks = conf.getInt("mapred.map.tasks", 1);
-
-
-
- int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
- GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
- if (numTasks < numHandlers) {
- numHandlers = numTasks;
- }
- this.jobToken = createJobToken();
- this.jobId = context.getJobID().toString();
-
- int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
- // If the number of flush threads is unset, it is set to
- // the number of max workers - 1 or a minimum of 1.
- int numFlushThreads =
- Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
- numWorkers - 1),
- 1);
- this.executor = Executors.newFixedThreadPool(numFlushThreads);
-
- // Simple handling of port collisions on the same machine while
- // preserving debugability from the port number alone.
- // Round up the max number of workers to the next power of 10 and use
- // it as a constant to increase the port number with.
- int portIncrementConstant =
- (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
- String bindAddress = localHostname;
- int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
- taskId;
- int bindAttempts = 0;
- final int maxRpcPortBindAttempts =
- conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
- GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
- while (bindAttempts < maxRpcPortBindAttempts) {
- this.myAddress = new InetSocketAddress(bindAddress, bindPort);
- try {
- this.server =
- getRPCServer(
- myAddress, numHandlers, this.jobId, this.jobToken);
- break;
- } catch (BindException e) {
- LOG.info("BasicRPCCommunications: Failed to bind with port " +
- bindPort + " on bind attempt " + bindAttempts);
- ++bindAttempts;
- bindPort += portIncrementConstant;
- }
- }
- if (bindAttempts == maxRpcPortBindAttempts) {
- throw new IllegalStateException(
- "BasicRPCCommunications: Failed to start RPCServer with " +
- maxRpcPortBindAttempts + " attempts");
- }
-
- this.server.start();
- this.myName = myAddress.toString();
+ public CommunicationsInterface<I, V, E, M> getRPCProxy() {
+ return peer;
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("BasicRPCCommunications: Started RPC " +
- "communication server: " + myName + " with " +
- numHandlers + " handlers and " + numFlushThreads +
- " flush threads on bind attempt " + bindAttempts);
- }
+ @Override
+ public String toString() {
+ return peer.getName() + ", proxy=" + isProxy;
}
+ }
+
+ /**
+ * Runnable to flush messages to a given connection.
+ */
+ private class PeerFlushExecutor implements Runnable {
+ /** Report on the status of this flusher if this interval was exceeded */
+ private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000;
+ /** Connection to send the messages to. */
+ private final PeerConnection peerConnection;
+ /** Saved context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
/**
- * Get the final port of the RPC server that it bound to.
+ * Constructor.
*
- * @return Port that RPC server was bound to.
+ * @param peerConnection Connection to send the messsages to.
+ * @param context Context of the mapper.
*/
- public int getPort() {
- return myAddress.getPort();
+ PeerFlushExecutor(PeerConnection peerConnection,
+ Mapper<?, ?, ?, ?>.Context context) {
+ this.peerConnection = peerConnection;
+ this.context = context;
}
@Override
- public void setup() {
- try {
- connectAllRPCProxys(this.jobId, this.jobToken);
- } catch (IOException e) {
- throw new IllegalStateException("setup: Got IOException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("setup: Got InterruptedException",
- e);
+ public void run() {
+ CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy();
+ long startMillis = System.currentTimeMillis();
+ long lastReportedMillis = startMillis;
+ try {
+ int verticesDone = 0;
+ synchronized (peerConnection.outMessagesPerPeer) {
+ final int vertices =
+ peerConnection.outMessagesPerPeer.size();
+ // 1. Check for null messages and combine if possible
+ // 2. Send vertex ids and messages in bulk to the
+ // destination servers.
+ for (Entry<I, MsgList<M>> entry :
+ peerConnection.outMessagesPerPeer.entrySet()) {
+ for (M msg : entry.getValue()) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "run: Cannot put null message on " +
+ "vertex id " + entry.getKey());
+ }
+ }
+ if (combiner != null && entry.getValue().size() > 1) {
+ Iterable<M> messages = combiner.combine(
+ entry.getKey(), entry.getValue());
+ if (messages == null) {
+ throw new IllegalStateException(
+ "run: Combiner cannot return null");
+ }
+ if (Iterables.size(entry.getValue()) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "run: The number of combined " +
+ "messages is required to be <= to " +
+ "number of messages to be combined");
+ }
+ entry.getValue().clear();
+ for (M msg: messages) {
+ entry.getValue().add(msg);
+ }
+ }
+ if (entry.getValue().isEmpty()) {
+ throw new IllegalStateException(
+ "run: Impossible for no messages in " +
+ entry.getKey());
+ }
+ }
+ while (!peerConnection.outMessagesPerPeer.isEmpty()) {
+ int bulkedMessages = 0;
+ Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt =
+ peerConnection.outMessagesPerPeer.entrySet().
+ iterator();
+ VertexIdMessagesList<I, M> vertexIdMessagesList =
+ new VertexIdMessagesList<I, M>();
+ while (vertexIdMessagesListIt.hasNext()) {
+ Entry<I, MsgList<M>> entry =
+ vertexIdMessagesListIt.next();
+ // Add this entry if the list is empty or we
+ // haven't reached the maximum number of messages
+ if (vertexIdMessagesList.isEmpty() ||
+ ((bulkedMessages + entry.getValue().size()) <
+ maxMessagesPerFlushPut)) {
+ vertexIdMessagesList.add(
+ new VertexIdMessages<I, M>(
+ entry.getKey(), entry.getValue()));
+ bulkedMessages += entry.getValue().size();
+ }
+ }
+
+ // Clean up references to the vertex id and messages
+ for (VertexIdMessages<I, M> vertexIdMessages :
+ vertexIdMessagesList) {
+ peerConnection.outMessagesPerPeer.remove(
+ vertexIdMessages.getVertexId());
+ }
+
+ proxy.putVertexIdMessagesList(vertexIdMessagesList);
+ context.progress();
+
+ verticesDone += vertexIdMessagesList.size();
+ long curMillis = System.currentTimeMillis();
+ if ((lastReportedMillis +
+ REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
+ lastReportedMillis = curMillis;
+ if (LOG.isInfoEnabled()) {
+ float percentDone =
+ (100f * verticesDone) /
+ vertices;
+ float minutesUsed =
+ (curMillis - startMillis) / 1000f / 60f;
+ float minutesRemaining =
+ (minutesUsed * 100f / percentDone) -
+ minutesUsed;
+ LOG.info("run: " + peerConnection + ", " +
+ verticesDone + " out of " +
+ vertices +
+ " done in " + minutesUsed +
+ " minutes, " +
+ percentDone + "% done, ETA " +
+ minutesRemaining +
+ " minutes remaining, " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+ }
+ }
}
- }
-
- protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy(
- final InetSocketAddress addr, String jobId, J jobToken)
- throws IOException, InterruptedException;
- /**
- * Establish connections to every RPC proxy server that will be used in
- * the upcoming messaging. This method is idempotent.
- *
- * @param jobId Stringified job id
- * @param jobToken used for
- * @throws InterruptedException
- * @throws IOException
- */
- private void connectAllRPCProxys(String jobId, J jobToken)
- throws IOException, InterruptedException {
- final int maxTries = 5;
- for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
- int tries = 0;
- while (tries < maxTries) {
- try {
- startPeerConnectionThread(
- partitionOwner.getWorkerInfo(), jobId, jobToken);
- break;
- } catch (IOException e) {
- LOG.warn("connectAllRPCProxys: Failed on attempt " +
- tries + " of " + maxTries +
- " to connect to " + partitionOwner.toString(), e);
- ++tries;
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("run: " + proxy.getName() +
+ ": all messages flushed");
}
- }
+ } catch (IOException e) {
+ LOG.error(e);
+ if (peerConnection.isProxy) {
+ RPC.stopProxy(peerConnection.peer);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * LargeMessageFlushExecutor flushes all outgoing messages destined to
+ * some vertices. This is executed when the number of messages destined to
+ * certain vertex exceeds <i>maxSize</i>.
+ */
+ private class LargeMessageFlushExecutor implements Runnable {
+ /** Destination vertex of message. */
+ private final I destVertex;
+ /** List of messages to the destination vertex */
+ private final MsgList<M> outMessageList;
+ /** Connection to send the message to. */
+ private PeerConnection peerConnection;
/**
- * Creates the connections to remote RPCs if any only if the inet socket
- * address doesn't already exist.
+ * Constructor of the executor for flushing large messages.
*
- * @param workerInfo My worker info
- * @param jobId Id of the job
- * @param jobToken Required for secure Hadoop
- * @throws IOException
- * @throws InterruptedException
+ * @param peerConnection Connection to send the message to.
+ * @param destVertex Destination vertex of message.
*/
- private void startPeerConnectionThread(WorkerInfo workerInfo,
- String jobId,
- J jobToken)
- throws IOException, InterruptedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("startPeerConnectionThread: hostname " +
- workerInfo.getHostname() + ", port " +
- workerInfo.getPort());
- }
- final InetSocketAddress addr =
- new InetSocketAddress(workerInfo.getHostname(),
- workerInfo.getPort());
- // Cheap way to hold both the hostname and port (rather than
- // make a class)
- InetSocketAddress addrUnresolved =
- InetSocketAddress.createUnresolved(addr.getHostName(),
- addr.getPort());
- Map<I, MsgList<M>> outMsgMap = null;
- boolean isProxy = true;
- CommunicationsInterface<I, V, E, M> peer = this;
- synchronized(outMessages) {
- outMsgMap = outMessages.get(addrUnresolved);
- if (LOG.isDebugEnabled()) {
- LOG.debug("startPeerConnectionThread: Connecting to " +
- workerInfo.toString() + ", addr = " + addr +
- " if outMsgMap (" + outMsgMap + ") == null ");
- }
- if (outMsgMap != null) { // this host has already been added
- return;
- }
-
- if (myName.equals(addr.toString())) {
- isProxy = false;
- } else {
- peer = getRPCProxy(addr, jobId, jobToken);
- }
-
- outMsgMap = new HashMap<I, MsgList<M>>();
- outMessages.put(addrUnresolved, outMsgMap);
- }
-
- PeerConnection peerConnection =
- new PeerConnection(outMsgMap, peer, isProxy);
- peerConnections.put(addrUnresolved, peerConnection);
+ LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
+ this.peerConnection = peerConnection;
+ synchronized (peerConnection.outMessagesPerPeer) {
+ this.destVertex = destVertex;
+ outMessageList =
+ peerConnection.outMessagesPerPeer.get(destVertex);
+ peerConnection.outMessagesPerPeer.remove(destVertex);
+ }
}
@Override
- public final long getProtocolVersion(String protocol, long clientVersion)
- throws IOException {
- return versionID;
- }
+ public void run() {
+ try {
+ CommunicationsInterface<I, V, E, M> proxy =
+ peerConnection.getRPCProxy();
+
+ if (combiner != null) {
+ Iterable<M> messages = combiner.combine(destVertex,
+ outMessageList);
+ if (messages == null) {
+ throw new IllegalStateException(
+ "run: Combiner cannot return null");
+ }
+ if (Iterables.size(outMessageList) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "run: The number of combined messages is " +
+ "required to be <= to the number of " +
+ "messages to be combined");
+ }
+ for (M msg: messages) {
+ proxy.putMsg(destVertex, msg);
+ }
+ } else {
+ proxy.putMsgList(destVertex, outMessageList);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ if (peerConnection.isProxy) {
+ RPC.stopProxy(peerConnection.peer);
+ }
+ throw new RuntimeException("run: Got IOException", e);
+ } finally {
+ outMessageList.clear();
+ }
+ }
+ }
+
+ /**
+ * Only constructor.
+ *
+ * @param context Context for getting configuration
+ * @param service Service worker to get the vertex ranges
+ * @throws IOException
+ * @throws UnknownHostException
+ * @throws InterruptedException
+ */
+ public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+ CentralizedServiceWorker<I, V, E, M> service)
+ throws IOException, InterruptedException {
+ this.service = service;
+ this.conf = context.getConfiguration();
+ this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
+ GiraphJob.MSG_SIZE_DEFAULT);
+ this.maxMessagesPerFlushPut =
+ conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
+ GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
+ if (BspUtils.getVertexCombinerClass(conf) == null) {
+ this.combiner = null;
+ } else {
+ this.combiner = BspUtils.createVertexCombiner(conf);
+ }
+
+ this.localHostname = InetAddress.getLocalHost().getHostName();
+ int taskId = conf.getInt("mapred.task.partition", -1);
+ int numTasks = conf.getInt("mapred.map.tasks", 1);
+
+
+
+ int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
+ GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
+ if (numTasks < numHandlers) {
+ numHandlers = numTasks;
+ }
+ this.jobToken = createJobToken();
+ this.jobId = context.getJobID().toString();
+
+ int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
+ // If the number of flush threads is unset, it is set to
+ // the number of max workers - 1 or a minimum of 1.
+ int numFlushThreads =
+ Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ numWorkers - 1),
+ 1);
+ this.executor = Executors.newFixedThreadPool(numFlushThreads);
+
+ // Simple handling of port collisions on the same machine while
+ // preserving debugability from the port number alone.
+ // Round up the max number of workers to the next power of 10 and use
+ // it as a constant to increase the port number with.
+ int portIncrementConstant =
+ (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
+ String bindAddress = localHostname;
+ int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
+ GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+ taskId;
+ int bindAttempts = 0;
+ final int maxRpcPortBindAttempts =
+ conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
+ GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ while (bindAttempts < maxRpcPortBindAttempts) {
+ this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+ try {
+ this.server =
+ getRPCServer(
+ myAddress, numHandlers, this.jobId, this.jobToken);
+ break;
+ } catch (BindException e) {
+ LOG.info("BasicRPCCommunications: Failed to bind with port " +
+ bindPort + " on bind attempt " + bindAttempts);
+ ++bindAttempts;
+ bindPort += portIncrementConstant;
+ }
+ }
+ if (bindAttempts == maxRpcPortBindAttempts) {
+ throw new IllegalStateException(
+ "BasicRPCCommunications: Failed to start RPCServer with " +
+ maxRpcPortBindAttempts + " attempts");
+ }
+
+ this.server.start();
+ this.myName = myAddress.toString();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("BasicRPCCommunications: Started RPC " +
+ "communication server: " + myName + " with " +
+ numHandlers + " handlers and " + numFlushThreads +
+ " flush threads on bind attempt " + bindAttempts);
+ }
+ }
+
+ /**
+ * Submit a large message to be sent.
+ *
+ * @param addr Message destination.
+ * @param destVertex Index of the destination vertex.
+ */
+ private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
+ PeerConnection pc = peerConnections.get(addr);
+ executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
+ }
+
+ /**
+ * Create the job token.
+ *
+ * @return Job token.
+ * @throws IOException
+ */
+ protected abstract J createJobToken() throws IOException;
+
+ /**
+ * Get the RPC server.
+ * @param addr Server address.
+ * @param numHandlers Number of handlers.
+ * @param jobId Job id.
+ * @param jobToken Job token.
+ * @return RPC server.
+ * @throws IOException
+ */
+ protected abstract Server getRPCServer(InetSocketAddress addr,
+ int numHandlers, String jobId, J jobToken) throws IOException;
+
+ /**
+ * Get the final port of the RPC server that it bound to.
+ *
+ * @return Port that RPC server was bound to.
+ */
+ public int getPort() {
+ return myAddress.getPort();
+ }
+
+ @Override
+ public void setup() {
+ try {
+ connectAllRPCProxys(this.jobId, this.jobToken);
+ } catch (IOException e) {
+ throw new IllegalStateException("setup: Got IOException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("setup: Got InterruptedException",
+ e);
+ }
+ }
+
+ /**
+ * Get the RPC proxy (handled by subclasses)
+ *
+ * @param addr Socket address.
+ * @param jobId Job id.
+ * @param jobToken Jobtoken (if any)
+ * @return The RPC proxy.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy(
+ final InetSocketAddress addr, String jobId, J jobToken)
+ throws IOException, InterruptedException;
+
+ /**
+ * Establish connections to every RPC proxy server that will be used in
+ * the upcoming messaging. This method is idempotent.
+ *
+ * @param jobId Stringified job id
+ * @param jobToken used for
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ private void connectAllRPCProxys(String jobId, J jobToken)
+ throws IOException, InterruptedException {
+ final int maxTries = 5;
+ for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
+ int tries = 0;
+ while (tries < maxTries) {
+ try {
+ startPeerConnectionThread(
+ partitionOwner.getWorkerInfo(), jobId, jobToken);
+ break;
+ } catch (IOException e) {
+ LOG.warn("connectAllRPCProxys: Failed on attempt " +
+ tries + " of " + maxTries +
+ " to connect to " + partitionOwner.toString(), e);
+ ++tries;
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates the connections to remote RPCs if any only if the inet socket
+ * address doesn't already exist.
+ *
+ * @param workerInfo My worker info
+ * @param jobId Id of the job
+ * @param jobToken Required for secure Hadoop
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void startPeerConnectionThread(WorkerInfo workerInfo,
+ String jobId,
+ J jobToken) throws IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("startPeerConnectionThread: hostname " +
+ workerInfo.getHostname() + ", port " +
+ workerInfo.getPort());
+ }
+ final InetSocketAddress addr =
+ new InetSocketAddress(workerInfo.getHostname(),
+ workerInfo.getPort());
+ // Cheap way to hold both the hostname and port (rather than
+ // make a class)
+ InetSocketAddress addrUnresolved =
+ InetSocketAddress.createUnresolved(addr.getHostName(),
+ addr.getPort());
+ Map<I, MsgList<M>> outMsgMap = null;
+ boolean isProxy = true;
+ CommunicationsInterface<I, V, E, M> peer = this;
+ synchronized (outMessages) {
+ outMsgMap = outMessages.get(addrUnresolved);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("startPeerConnectionThread: Connecting to " +
+ workerInfo.toString() + ", addr = " + addr +
+ " if outMsgMap (" + outMsgMap + ") == null ");
+ }
+ if (outMsgMap != null) { // this host has already been added
+ return;
+ }
+
+ if (myName.equals(addr.toString())) {
+ isProxy = false;
+ } else {
+ peer = getRPCProxy(addr, jobId, jobToken);
+ }
+
+ outMsgMap = new HashMap<I, MsgList<M>>();
+ outMessages.put(addrUnresolved, outMsgMap);
+ }
+
+ PeerConnection peerConnection =
+ new PeerConnection(outMsgMap, peer, isProxy);
+ peerConnections.put(addrUnresolved, peerConnection);
+ }
+
+ @Override
+ public final long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return VERSION_ID;
+ }
-/*if[HADOOP_FACEBOOK]
+ /*if[HADOOP_FACEBOOK]
public ProtocolSignature getProtocolSignature(
String protocol,
long clientVersion,
@@ -623,602 +690,600 @@ public abstract class BasicRPCCommunicat
}
end[HADOOP_FACEBOOK]*/
- @Override
- public void closeConnections() throws IOException {
- for(PeerConnection pc : peerConnections.values()) {
- pc.close();
- }
- }
-
-
- @Override
- public final void close() {
- LOG.info("close: shutting down RPC server");
- server.stop();
- }
-
- @Override
- public final void putMsg(I vertex, M msg) throws IOException {
- List<M> msgs = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex);
- }
- if (inPrepareSuperstep) {
- // Called by combiner (main thread) during superstep preparation
- msgs = inMessages.get(vertex);
- if (msgs == null) {
- msgs = new ArrayList<M>();
- inMessages.put(vertex, msgs);
- }
- msgs.add(msg);
- }
- else {
- synchronized(transientInMessages) {
- msgs = transientInMessages.get(vertex);
- if (msgs == null) {
- msgs = new ArrayList<M>();
- transientInMessages.put(vertex, msgs);
- }
- }
- synchronized(msgs) {
- msgs.add(msg);
- }
- }
- }
-
- @Override
- public final void putMsgList(I vertex,
- MsgList<M> msgList) throws IOException {
- List<M> msgs = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("putMsgList: Adding msgList " + msgList +
- " on vertex " + vertex);
- }
- synchronized(transientInMessages) {
- msgs = transientInMessages.get(vertex);
- if (msgs == null) {
- msgs = new ArrayList<M>(msgList.size());
- transientInMessages.put(vertex, msgs);
- }
- }
- synchronized(msgs) {
- msgs.addAll(msgList);
- }
- }
-
- @Override
- public final void putVertexIdMessagesList(
- VertexIdMessagesList<I, M> vertexIdMessagesList)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putVertexIdMessagesList: Adding msgList " +
- vertexIdMessagesList);
- }
-
- List<M> messageList = null;
- for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) {
- synchronized(transientInMessages) {
- messageList =
- transientInMessages.get(vertexIdMessages.getVertexId());
- if (messageList == null) {
- messageList = new ArrayList<M>(
- vertexIdMessages.getMessageList().size());
- transientInMessages.put(
- vertexIdMessages.getVertexId(), messageList);
- }
- }
- synchronized(messageList) {
- messageList.addAll(vertexIdMessages.getMessageList());
- }
- }
- }
-
- @Override
- public final void putVertexList(int partitionId,
- VertexList<I, V, E, M> vertexList)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putVertexList: On partition id " + partitionId +
- " adding vertex list of size " + vertexList.size());
- }
- synchronized(inPartitionVertexMap) {
- if (vertexList.size() == 0) {
- return;
- }
- if (!inPartitionVertexMap.containsKey(partitionId)) {
- inPartitionVertexMap.put(partitionId,
- new ArrayList<BasicVertex<I, V, E, M>>(vertexList));
- } else {
- List<BasicVertex<I, V, E, M>> tmpVertexList =
- inPartitionVertexMap.get(partitionId);
- tmpVertexList.addAll(vertexList);
- }
- }
- }
-
- @Override
- public final void addEdge(I vertexIndex, Edge<I, E> edge) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addEdge: Adding edge " + edge);
- }
- synchronized(inVertexMutationsMap) {
- VertexMutations<I, V, E, M> vertexMutations = null;
- if (!inVertexMutationsMap.containsKey(vertexIndex)) {
- vertexMutations = new VertexMutations<I, V, E, M>();
- inVertexMutationsMap.put(vertexIndex, vertexMutations);
- } else {
- vertexMutations = inVertexMutationsMap.get(vertexIndex);
- }
- vertexMutations.addEdge(edge);
- }
- }
-
- @Override
- public void removeEdge(I vertexIndex, I destinationVertexIndex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("removeEdge: Removing edge on destination " +
- destinationVertexIndex);
- }
- synchronized(inVertexMutationsMap) {
- VertexMutations<I, V, E, M> vertexMutations = null;
- if (!inVertexMutationsMap.containsKey(vertexIndex)) {
- vertexMutations = new VertexMutations<I, V, E, M>();
- inVertexMutationsMap.put(vertexIndex, vertexMutations);
- } else {
- vertexMutations = inVertexMutationsMap.get(vertexIndex);
- }
- vertexMutations.removeEdge(destinationVertexIndex);
+ @Override
+ public void closeConnections() throws IOException {
+ for (PeerConnection pc : peerConnections.values()) {
+ pc.close();
+ }
+ }
+
+
+ @Override
+ public final void close() {
+ LOG.info("close: shutting down RPC server");
+ server.stop();
+ }
+
+ @Override
+ public final void putMsg(I vertex, M msg) throws IOException {
+ List<M> msgs = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex);
+ }
+ if (inPrepareSuperstep) {
+ // Called by combiner (main thread) during superstep preparation
+ msgs = inMessages.get(vertex);
+ if (msgs == null) {
+ msgs = new ArrayList<M>();
+ inMessages.put(vertex, msgs);
+ }
+ msgs.add(msg);
+ } else {
+ synchronized (transientInMessages) {
+ msgs = transientInMessages.get(vertex);
+ if (msgs == null) {
+ msgs = new ArrayList<M>();
+ transientInMessages.put(vertex, msgs);
+ }
+ }
+ synchronized (msgs) {
+ msgs.add(msg);
+ }
+ }
+ }
+
+ @Override
+ public final void putMsgList(I vertex,
+ MsgList<M> msgList) throws IOException {
+ List<M> msgs = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putMsgList: Adding msgList " + msgList +
+ " on vertex " + vertex);
+ }
+ synchronized (transientInMessages) {
+ msgs = transientInMessages.get(vertex);
+ if (msgs == null) {
+ msgs = new ArrayList<M>(msgList.size());
+ transientInMessages.put(vertex, msgs);
+ }
+ }
+ synchronized (msgs) {
+ msgs.addAll(msgList);
+ }
+ }
+
+ @Override
+ public final void putVertexIdMessagesList(
+ VertexIdMessagesList<I, M> vertexIdMessagesList)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putVertexIdMessagesList: Adding msgList " +
+ vertexIdMessagesList);
+ }
+
+ List<M> messageList = null;
+ for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) {
+ synchronized (transientInMessages) {
+ messageList =
+ transientInMessages.get(vertexIdMessages.getVertexId());
+ if (messageList == null) {
+ messageList = new ArrayList<M>(
+ vertexIdMessages.getMessageList().size());
+ transientInMessages.put(
+ vertexIdMessages.getVertexId(), messageList);
+ }
+ }
+ synchronized (messageList) {
+ messageList.addAll(vertexIdMessages.getMessageList());
+ }
+ }
+ }
+
+ @Override
+ public final void putVertexList(int partitionId,
+ VertexList<I, V, E, M> vertexList) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putVertexList: On partition id " + partitionId +
+ " adding vertex list of size " + vertexList.size());
+ }
+ synchronized (inPartitionVertexMap) {
+ if (vertexList.size() == 0) {
+ return;
+ }
+ if (!inPartitionVertexMap.containsKey(partitionId)) {
+ inPartitionVertexMap.put(partitionId,
+ new ArrayList<BasicVertex<I, V, E, M>>(vertexList));
+ } else {
+ List<BasicVertex<I, V, E, M>> tmpVertexList =
+ inPartitionVertexMap.get(partitionId);
+ tmpVertexList.addAll(vertexList);
+ }
+ }
+ }
+
+ @Override
+ public final void addEdge(I vertexIndex, Edge<I, E> edge) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addEdge: Adding edge " + edge);
+ }
+ synchronized (inVertexMutationsMap) {
+ VertexMutations<I, V, E, M> vertexMutations = null;
+ if (!inVertexMutationsMap.containsKey(vertexIndex)) {
+ vertexMutations = new VertexMutations<I, V, E, M>();
+ inVertexMutationsMap.put(vertexIndex, vertexMutations);
+ } else {
+ vertexMutations = inVertexMutationsMap.get(vertexIndex);
+ }
+ vertexMutations.addEdge(edge);
+ }
+ }
+
+ @Override
+ public void removeEdge(I vertexIndex, I destinationVertexIndex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeEdge: Removing edge on destination " +
+ destinationVertexIndex);
+ }
+ synchronized (inVertexMutationsMap) {
+ VertexMutations<I, V, E, M> vertexMutations = null;
+ if (!inVertexMutationsMap.containsKey(vertexIndex)) {
+ vertexMutations = new VertexMutations<I, V, E, M>();
+ inVertexMutationsMap.put(vertexIndex, vertexMutations);
+ } else {
+ vertexMutations = inVertexMutationsMap.get(vertexIndex);
+ }
+ vertexMutations.removeEdge(destinationVertexIndex);
+ }
+ }
+
+ @Override
+ public final void addVertex(BasicVertex<I, V, E, M> vertex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addVertex: Adding vertex " + vertex);
+ }
+ synchronized (inVertexMutationsMap) {
+ VertexMutations<I, V, E, M> vertexMutations = null;
+ if (!inVertexMutationsMap.containsKey(vertex.getVertexId())) {
+ vertexMutations = new VertexMutations<I, V, E, M>();
+ inVertexMutationsMap.put(vertex.getVertexId(), vertexMutations);
+ } else {
+ vertexMutations = inVertexMutationsMap.get(vertex.getVertexId());
+ }
+ vertexMutations.addVertex(vertex);
+ }
+ }
+
+ @Override
+ public void removeVertex(I vertexIndex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeVertex: Removing vertex " + vertexIndex);
+ }
+ synchronized (inVertexMutationsMap) {
+ VertexMutations<I, V, E, M> vertexMutations = null;
+ if (!inVertexMutationsMap.containsKey(vertexIndex)) {
+ vertexMutations = new VertexMutations<I, V, E, M>();
+ inVertexMutationsMap.put(vertexIndex, vertexMutations);
+ } else {
+ vertexMutations = inVertexMutationsMap.get(vertexIndex);
+ }
+ vertexMutations.removeVertex();
+ }
+ }
+
+ @Override
+ public final void sendPartitionReq(WorkerInfo workerInfo,
+ Partition<I, V, E, M> partition) {
+ // Internally, break up the sending so that the list doesn't get too
+ // big.
+ VertexList<I, V, E, M> hadoopVertexList =
+ new VertexList<I, V, E, M>();
+ InetSocketAddress addr =
+ getInetSocketAddress(workerInfo, partition.getPartitionId());
+ CommunicationsInterface<I, V, E, M> rpcProxy =
+ peerConnections.get(addr).getRPCProxy();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sendPartitionReq: Sending to " + rpcProxy.getName() +
+ " " + addr + " from " + workerInfo +
+ ", with partition " + partition);
+ }
+ for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+ hadoopVertexList.add(vertex);
+ if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) {
+ try {
+ rpcProxy.putVertexList(partition.getPartitionId(),
+ hadoopVertexList);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
+ hadoopVertexList.clear();
+ }
}
-
- @Override
- public final void addVertex(BasicVertex<I, V, E, M> vertex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addVertex: Adding vertex " + vertex);
- }
- synchronized(inVertexMutationsMap) {
- VertexMutations<I, V, E, M> vertexMutations = null;
- if (!inVertexMutationsMap.containsKey(vertex.getVertexId())) {
- vertexMutations = new VertexMutations<I, V, E, M>();
- inVertexMutationsMap.put(vertex.getVertexId(), vertexMutations);
- } else {
- vertexMutations = inVertexMutationsMap.get(vertex.getVertexId());
+ if (hadoopVertexList.size() > 0) {
+ try {
+ rpcProxy.putVertexList(partition.getPartitionId(),
+ hadoopVertexList);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Fill the socket address cache for the worker info and its partition.
+ *
+ * @param workerInfo Worker information to get the socket address
+ * @param partitionId Partition id to look up.
+ * @return address of the vertex range server containing this vertex
+ */
+ private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+ int partitionId) {
+ synchronized (partitionIndexAddressMap) {
+ InetSocketAddress address =
+ partitionIndexAddressMap.get(partitionId);
+ if (address == null) {
+ address = InetSocketAddress.createUnresolved(
+ workerInfo.getHostname(),
+ workerInfo.getPort());
+ partitionIndexAddressMap.put(partitionId, address);
+ }
+
+ if (address.getPort() != workerInfo.getPort() ||
+ !address.getHostName().equals(workerInfo.getHostname())) {
+ throw new IllegalStateException(
+ "getInetSocketAddress: Impossible that address " +
+ address + " does not match " + workerInfo);
+ }
+
+ return address;
+ }
+ }
+
+ /**
+ * Fill the socket address cache for the partition owner.
+ *
+ * @param destVertex vertex to be sent
+ * @return address of the vertex range server containing this vertex
+ */
+ private InetSocketAddress getInetSocketAddress(I destVertex) {
+ PartitionOwner partitionOwner =
+ service.getVertexPartitionOwner(destVertex);
+ return getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId());
+ }
+
+ @Override
+ public final void sendMessageReq(I destVertex, M msg) {
+ InetSocketAddress addr = getInetSocketAddress(destVertex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sendMessage: Send bytes (" + msg.toString() +
+ ") to " + destVertex + " with address " + addr);
+ }
+ ++totalMsgsSentInSuperstep;
+ Map<I, MsgList<M>> msgMap = null;
+ synchronized (outMessages) {
+ msgMap = outMessages.get(addr);
+ }
+ if (msgMap == null) { // should never happen after constructor
+ throw new RuntimeException(
+ "sendMessage: msgMap did not exist for " + addr +
+ " for vertex " + destVertex);
+ }
+
+ synchronized (msgMap) {
+ MsgList<M> msgList = msgMap.get(destVertex);
+ if (msgList == null) { // should only happen once
+ msgList = new MsgList<M>();
+ msgMap.put(destVertex, msgList);
+ }
+ msgList.add(msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sendMessage: added msg=" + msg + ", size=" +
+ msgList.size());
+ }
+ if (msgList.size() > maxSize) {
+ submitLargeMessageSend(addr, destVertex);
+ }
+ }
+ }
+
+ @Override
+ public final void addEdgeReq(I destVertex, Edge<I, E> edge)
+ throws IOException {
+ InetSocketAddress addr = getInetSocketAddress(destVertex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addEdgeReq: Add edge (" + edge.toString() + ") to " +
+ destVertex + " with address " + addr);
+ }
+ CommunicationsInterface<I, V, E, M> rpcProxy =
+ peerConnections.get(addr).getRPCProxy();
+ rpcProxy.addEdge(destVertex, edge);
+ }
+
+ @Override
+ public final void removeEdgeReq(I vertexIndex, I destVertexIndex)
+ throws IOException {
+ InetSocketAddress addr = getInetSocketAddress(vertexIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeEdgeReq: remove edge (" + destVertexIndex +
+ ") from" + vertexIndex + " with address " + addr);
+ }
+ CommunicationsInterface<I, V, E, M> rpcProxy =
+ peerConnections.get(addr).getRPCProxy();
+ rpcProxy.removeEdge(vertexIndex, destVertexIndex);
+ }
+
+ @Override
+ public final void addVertexReq(BasicVertex<I, V, E, M> vertex)
+ throws IOException {
+ InetSocketAddress addr = getInetSocketAddress(vertex.getVertexId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addVertexReq: Add vertex (" + vertex + ") " +
+ " with address " + addr);
+ }
+ CommunicationsInterface<I, V, E, M> rpcProxy =
+ peerConnections.get(addr).getRPCProxy();
+ rpcProxy.addVertex(vertex);
+ }
+
+ @Override
+ public void removeVertexReq(I vertexIndex) throws IOException {
+ InetSocketAddress addr =
+ getInetSocketAddress(vertexIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeVertexReq: Remove vertex index (" +
+ vertexIndex + ") with address " + addr);
+ }
+ CommunicationsInterface<I, V, E, M> rpcProxy =
+ peerConnections.get(addr).getRPCProxy();
+ rpcProxy.removeVertex(vertexIndex);
+ }
+
+ @Override
+ public long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("flush: starting for superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+ for (List<M> msgList : inMessages.values()) {
+ msgList.clear();
+ }
+ inMessages.clear();
+
+ Collection<Future<?>> futures = new ArrayList<Future<?>>();
+
+ // randomize peers in order to avoid hotspot on racks
+ List<PeerConnection> peerList =
+ new ArrayList<PeerConnection>(peerConnections.values());
+ Collections.shuffle(peerList);
+
+ for (PeerConnection pc : peerList) {
+ futures.add(executor.submit(new PeerFlushExecutor(pc, context)));
+ }
+
+ // wait for all flushes
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ context.progress();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("flush: Got IOException", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "flush: Got ExecutionException", e);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("flush: ended for superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ long msgs = totalMsgsSentInSuperstep;
+ totalMsgsSentInSuperstep = 0;
+ return msgs;
+ }
+
+ @Override
+ public void prepareSuperstep() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("prepareSuperstep: Superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+ inPrepareSuperstep = true;
+
+ // Combine and put the transient messages into the inMessages.
+ synchronized (transientInMessages) {
+ for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
+ if (combiner != null) {
+ try {
+ Iterable<M> messages =
+ combiner.combine(entry.getKey(),
+ entry.getValue());
+ if (messages == null) {
+ throw new IllegalStateException(
+ "prepareSuperstep: Combiner cannot " +
+ "return null");
+ }
+ if (Iterables.size(entry.getValue()) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "prepareSuperstep: The number of " +
+ "combined messages is " +
+ "required to be <= to the number of " +
+ "messages to be combined");
}
- vertexMutations.addVertex(vertex);
- }
- }
-
- @Override
- public void removeVertex(I vertexIndex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("removeVertex: Removing vertex " + vertexIndex);
- }
- synchronized(inVertexMutationsMap) {
- VertexMutations<I, V, E, M> vertexMutations = null;
- if (!inVertexMutationsMap.containsKey(vertexIndex)) {
- vertexMutations = new VertexMutations<I, V, E, M>();
- inVertexMutationsMap.put(vertexIndex, vertexMutations);
- } else {
- vertexMutations = inVertexMutationsMap.get(vertexIndex);
+ for (M msg: messages) {
+ putMsg(entry.getKey(), msg);
}
- vertexMutations.removeVertex();
- }
- }
-
- @Override
- public final void sendPartitionReq(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition) {
- // Internally, break up the sending so that the list doesn't get too
- // big.
- VertexList<I, V, E, M> hadoopVertexList =
- new VertexList<I, V, E, M>();
- InetSocketAddress addr =
- getInetSocketAddress(workerInfo, partition.getPartitionId());
- CommunicationsInterface<I, V, E, M> rpcProxy =
- peerConnections.get(addr).getRPCProxy();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("sendPartitionReq: Sending to " + rpcProxy.getName() +
- " " + addr + " from " + workerInfo +
- ", with partition " + partition);
- }
+ } catch (IOException e) {
+ // no actual IO -- should never happen
+ throw new RuntimeException(e);
+ }
+ } else {
+ List<M> msgs = inMessages.get(entry.getKey());
+ if (msgs == null) {
+ msgs = new ArrayList<M>();
+ inMessages.put(entry.getKey(), msgs);
+ }
+ msgs.addAll(entry.getValue());
+ }
+ entry.getValue().clear();
+ }
+ transientInMessages.clear();
+ }
+
+ if (inMessages.size() > 0) {
+ // Assign the messages to each destination vertex (getting rid of
+ // the old ones)
+ for (Partition<I, V, E, M> partition :
+ service.getPartitionMap().values()) {
for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
- hadoopVertexList.add(vertex);
- if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) {
- try {
- rpcProxy.putVertexList(partition.getPartitionId(),
- hadoopVertexList);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- hadoopVertexList.clear();
- }
- }
- if (hadoopVertexList.size() > 0) {
- try {
- rpcProxy.putVertexList(partition.getPartitionId(),
- hadoopVertexList);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Fill the socket address cache for the worker info and its partition.
- *
- * @param workerInfo Worker information to get the socket address
- * @param partitionId
- * @return address of the vertex range server containing this vertex
- */
- private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
- int partitionId) {
- synchronized(partitionIndexAddressMap) {
- InetSocketAddress address =
- partitionIndexAddressMap.get(partitionId);
- if (address == null) {
- address = InetSocketAddress.createUnresolved(
- workerInfo.getHostname(),
- workerInfo.getPort());
- partitionIndexAddressMap.put(partitionId, address);
- }
-
- if (address.getPort() != workerInfo.getPort() ||
- !address.getHostName().equals(workerInfo.getHostname())) {
- throw new IllegalStateException(
- "getInetSocketAddress: Impossible that address " +
- address + " does not match " + workerInfo);
- }
-
- return address;
- }
- }
-
- /**
- * Fill the socket address cache for the partition owner.
- *
- * @param destVertex vertex to be sent
- * @return address of the vertex range server containing this vertex
- */
- private InetSocketAddress getInetSocketAddress(I destVertex) {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(destVertex);
- return getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId());
- }
-
- @Override
- public final void sendMessageReq(I destVertex, M msg) {
- InetSocketAddress addr = getInetSocketAddress(destVertex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("sendMessage: Send bytes (" + msg.toString() +
- ") to " + destVertex + " with address " + addr);
- }
- ++totalMsgsSentInSuperstep;
- Map<I, MsgList<M>> msgMap = null;
- synchronized(outMessages) {
- msgMap = outMessages.get(addr);
- }
- if (msgMap == null) { // should never happen after constructor
- throw new RuntimeException(
- "sendMessage: msgMap did not exist for " + addr +
- " for vertex " + destVertex);
- }
-
- synchronized(msgMap) {
- MsgList<M> msgList = msgMap.get(destVertex);
- if (msgList == null) { // should only happen once
- msgList = new MsgList<M>();
- msgMap.put(destVertex, msgList);
- }
- msgList.add(msg);
+ List<M> msgList = inMessages.get(vertex.getVertexId());
+ if (msgList != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("sendMessage: added msg=" + msg + ", size=" +
- msgList.size());
+ LOG.debug("prepareSuperstep: Assigning " +
+ msgList.size() +
+ " mgs to vertex index " + vertex);
+ }
+ for (M msg : msgList) {
+ if (msg == null) {
+ LOG.warn("prepareSuperstep: Null message " +
+ "in inMessages");
+ }
}
- if (msgList.size() > maxSize) {
- submitLargeMessageSend(addr, destVertex);
- }
- }
- }
-
- @Override
- public final void addEdgeReq(I destVertex, Edge<I, E> edge)
- throws IOException {
- InetSocketAddress addr = getInetSocketAddress(destVertex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("addEdgeReq: Add edge (" + edge.toString() + ") to " +
- destVertex + " with address " + addr);
- }
- CommunicationsInterface<I, V, E, M> rpcProxy =
- peerConnections.get(addr).getRPCProxy();
- rpcProxy.addEdge(destVertex, edge);
- }
-
- @Override
- public final void removeEdgeReq(I vertexIndex, I destVertexIndex)
- throws IOException {
- InetSocketAddress addr = getInetSocketAddress(vertexIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("removeEdgeReq: remove edge (" + destVertexIndex +
- ") from" + vertexIndex + " with address " + addr);
- }
- CommunicationsInterface<I, V, E, M> rpcProxy =
- peerConnections.get(addr).getRPCProxy();
- rpcProxy.removeEdge(vertexIndex, destVertexIndex);
- }
-
- @Override
- public final void addVertexReq(BasicVertex<I, V, E, M> vertex)
- throws IOException {
- InetSocketAddress addr = getInetSocketAddress(vertex.getVertexId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("addVertexReq: Add vertex (" + vertex + ") " +
- " with address " + addr);
- }
- CommunicationsInterface<I, V, E, M> rpcProxy =
- peerConnections.get(addr).getRPCProxy();
- rpcProxy.addVertex(vertex);
- }
-
- @Override
- public void removeVertexReq(I vertexIndex) throws IOException {
- InetSocketAddress addr =
- getInetSocketAddress(vertexIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("removeVertexReq: Remove vertex index ("
- + vertexIndex + ") with address " + addr);
- }
- CommunicationsInterface<I, V, E, M> rpcProxy =
- peerConnections.get(addr).getRPCProxy();
- rpcProxy.removeVertex(vertexIndex);
- }
-
- @Override
- public long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException {
- if (LOG.isInfoEnabled()) {
- LOG.info("flush: starting for superstep " +
- service.getSuperstep() + " " +
- MemoryUtils.getRuntimeMemoryStats());
- }
- for (List<M> msgList : inMessages.values()) {
+ service.assignMessagesToVertex(vertex, msgList);
msgList.clear();
- }
- inMessages.clear();
-
- Collection<Future<?>> futures = new ArrayList<Future<?>>();
-
- // randomize peers in order to avoid hotspot on racks
- List<PeerConnection> peerList =
- new ArrayList<PeerConnection>(peerConnections.values());
- Collections.shuffle(peerList);
-
- for (PeerConnection pc : peerList) {
- futures.add(executor.submit(new PeerFlushExecutor(pc, context)));
- }
-
- // wait for all flushes
- for (Future<?> future : futures) {
- try {
- future.get();
- context.progress();
- } catch (InterruptedException e) {
- throw new IllegalStateException("flush: Got IOException", e);
- } catch (ExecutionException e) {
- throw new IllegalStateException(
- "flush: Got ExecutionException", e);
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("flush: ended for superstep " +
- service.getSuperstep() + " " +
- MemoryUtils.getRuntimeMemoryStats());
- }
-
- long msgs = totalMsgsSentInSuperstep;
- totalMsgsSentInSuperstep = 0;
- return msgs;
- }
-
- @Override
- public void prepareSuperstep() {
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareSuperstep: Superstep " +
- service.getSuperstep() + " " +
- MemoryUtils.getRuntimeMemoryStats());
- }
- inPrepareSuperstep = true;
-
- // Combine and put the transient messages into the inMessages.
- synchronized(transientInMessages) {
- for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
- if (combiner != null) {
- try {
- Iterable<M> messages =
- combiner.combine(entry.getKey(),
- entry.getValue());
- if (messages == null) {
- throw new IllegalStateException(
- "prepareSuperstep: Combiner cannot " +
- "return null");
- }
- if (Iterables.size(entry.getValue()) <
- Iterables.size(messages)) {
- throw new IllegalStateException(
- "prepareSuperstep: The number of " +
- "combined messages is " +
- "required to be <= to the number of " +
- "messages to be combined");
- }
- for (M msg: messages) {
- putMsg(entry.getKey(), msg);
- }
- } catch (IOException e) {
- // no actual IO -- should never happen
- throw new RuntimeException(e);
- }
- } else {
- List<M> msgs = inMessages.get(entry.getKey());
- if (msgs == null) {
- msgs = new ArrayList<M>();
- inMessages.put(entry.getKey(), msgs);
- }
- msgs.addAll(entry.getValue());
- }
- entry.getValue().clear();
- }
- transientInMessages.clear();
- }
-
- if (inMessages.size() > 0) {
- // Assign the messages to each destination vertex (getting rid of
- // the old ones)
- for (Partition<I, V, E, M> partition :
- service.getPartitionMap().values()) {
- for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
- List<M> msgList = inMessages.get(vertex.getVertexId());
- if (msgList != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Assigning " +
- msgList.size() +
- " mgs to vertex index " + vertex);
- }
- for (M msg : msgList) {
- if (msg == null) {
- LOG.warn("prepareSuperstep: Null message " +
- "in inMessages");
- }
- }
- service.assignMessagesToVertex(vertex, msgList);
- msgList.clear();
- if (inMessages.remove(vertex.getVertexId()) == null) {
- throw new IllegalStateException(
- "prepareSuperstep: Impossible to not remove " +
- vertex);
- }
- }
- }
- }
- }
-
- inPrepareSuperstep = false;
-
- // Resolve what happens when messages are sent to non-existent vertices
- // and vertices that have mutations. Also make sure that the messages
- // are being sent to the correct destination
- Set<I> resolveVertexIndexSet = new TreeSet<I>();
- if (inMessages.size() > 0) {
- for (Entry<I, List<M>> entry : inMessages.entrySet()) {
- if (service.getPartition(entry.getKey()) == null) {
- throw new IllegalStateException(
- "prepareSuperstep: Impossible that this worker " +
- service.getWorkerInfo() + " was sent " +
- entry.getValue().size() + " message(s) with " +
- "vertex id " + entry.getKey() +
- " when it does not own this partition. It should " +
- "have gone to partition owner " +
- service.getVertexPartitionOwner(entry.getKey()) +
- ". The partition owners are " +
- service.getPartitionOwners());
- }
- resolveVertexIndexSet.add(entry.getKey());
- }
- }
- synchronized(inVertexMutationsMap) {
- for (I vertexIndex : inVertexMutationsMap.keySet()) {
- resolveVertexIndexSet.add(vertexIndex);
- }
- }
-
- // Resolve all graph mutations
- for (I vertexIndex : resolveVertexIndexSet) {
- VertexResolver<I, V, E, M> vertexResolver =
- BspUtils.createVertexResolver(
- conf, service.getGraphMapper().getGraphState());
- BasicVertex<I, V, E, M> originalVertex =
- service.getVertex(vertexIndex);
- Iterable<M> messages = inMessages.get(vertexIndex);
- if (originalVertex != null) {
- messages = originalVertex.getMessages();
- }
- VertexMutations<I, V, E, M> vertexMutations =
- inVertexMutationsMap.get(vertexIndex);
- BasicVertex<I, V, E, M> vertex =
- vertexResolver.resolve(vertexIndex,
- originalVertex,
- vertexMutations,
- messages);
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Resolved vertex index " +
- vertexIndex + " with original vertex " +
- originalVertex + ", returned vertex " + vertex +
- " on superstep " + service.getSuperstep() +
- " with mutations " +
- vertexMutations);
- }
-
- Partition<I, V, E, M> partition =
- service.getPartition(vertexIndex);
- if (partition == null) {
- throw new IllegalStateException(
- "prepareSuperstep: No partition for index " + vertexIndex +
- " in " + service.getPartitionMap() + " should have been " +
- service.getVertexPartitionOwner(vertexIndex));
- }
- if (vertex != null) {
- ((MutableVertex<I, V, E, M>) vertex).setVertexId(vertexIndex);
- partition.putVertex((BasicVertex<I, V, E, M>) vertex);
- } else if (originalVertex != null) {
- partition.removeVertex(originalVertex.getVertexId());
- }
- }
- synchronized(inVertexMutationsMap) {
- inVertexMutationsMap.clear();
- }
- }
-
- @Override
- public void fixPartitionIdToSocketAddrMap() {
- // 1. Fix all the cached inet addresses (remove all changed entries)
- // 2. Connect to any new RPC servers
- synchronized(partitionIndexAddressMap) {
- for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
- InetSocketAddress address =
- partitionIndexAddressMap.get(
- partitionOwner.getPartitionId());
- if (address != null &&
- (!address.getHostName().equals(
- partitionOwner.getWorkerInfo().getHostname()) ||
- address.getPort() !=
- partitionOwner.getWorkerInfo().getPort())) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fixPartitionIdToSocketAddrMap: " +
- "Partition owner " +
- partitionOwner + " changed from " +
- address);
- }
- partitionIndexAddressMap.remove(
- partitionOwner.getPartitionId());
- }
- }
- }
- try {
- connectAllRPCProxys(this.jobId, this.jobToken);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public String getName() {
- return myName;
- }
-
- @Override
- public Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap() {
- return inPartitionVertexMap;
- }
+ if (inMessages.remove(vertex.getVertexId()) == null) {
+ throw new IllegalStateException(
+ "prepareSuperstep: Impossible to not remove " +
+ vertex);
+ }
+ }
+ }
+ }
+ }
+
+ inPrepareSuperstep = false;
+
+ // Resolve what happens when messages are sent to non-existent vertices
+ // and vertices that have mutations. Also make sure that the messages
+ // are being sent to the correct destination
+ Set<I> resolveVertexIndexSet = new TreeSet<I>();
+ if (inMessages.size() > 0) {
+ for (Entry<I, List<M>> entry : inMessages.entrySet()) {
+ if (service.getPartition(entry.getKey()) == null) {
+ throw new IllegalStateException(
+ "prepareSuperstep: Impossible that this worker " +
+ service.getWorkerInfo() + " was sent " +
+ entry.getValue().size() + " message(s) with " +
+ "vertex id " + entry.getKey() +
+ " when it does not own this partition. It should " +
+ "have gone to partition owner " +
+ service.getVertexPartitionOwner(entry.getKey()) +
+ ". The partition owners are " +
+ service.getPartitionOwners());
+ }
+ resolveVertexIndexSet.add(entry.getKey());
+ }
+ }
+ synchronized (inVertexMutationsMap) {
+ for (I vertexIndex : inVertexMutationsMap.keySet()) {
+ resolveVertexIndexSet.add(vertexIndex);
+ }
+ }
+
+ // Resolve all graph mutations
+ for (I vertexIndex : resolveVertexIndexSet) {
+ VertexResolver<I, V, E, M> vertexResolver =
+ BspUtils.createVertexResolver(
+ conf, service.getGraphMapper().getGraphState());
+ BasicVertex<I, V, E, M> originalVertex =
+ service.getVertex(vertexIndex);
+ Iterable<M> messages = inMessages.get(vertexIndex);
+ if (originalVertex != null) {
+ messages = originalVertex.getMessages();
+ }
+ VertexMutations<I, V, E, M> vertexMutations =
+ inVertexMutationsMap.get(vertexIndex);
+ BasicVertex<I, V, E, M> vertex =
+ vertexResolver.resolve(vertexIndex,
+ originalVertex,
+ vertexMutations,
+ messages);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Resolved vertex index " +
+ vertexIndex + " with original vertex " +
+ originalVertex + ", returned vertex " + vertex +
+ " on superstep " + service.getSuperstep() +
+ " with mutations " +
+ vertexMutations);
+ }
+
+ Partition<I, V, E, M> partition =
+ service.getPartition(vertexIndex);
[... 63 lines stripped ...]