You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/15 22:07:32 UTC
svn commit: r1373609 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/
src/main/java/org/apache/giraph/graph/partition/
src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/comm/
Author: aching
Date: Wed Aug 15 20:07:32 2012
New Revision: 1373609
URL: http://svn.apache.org/viewvc?rev=1373609&view=rev
Log:
GIRAPH-300) Improve netty reliability with retrying failed
connections, tracking requests, thread-safe hash partitioning (aching
via apresta).
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 15 20:07:32 2012
@@ -2,8 +2,12 @@ Giraph Change Log
Release 0.2.0 - unreleased
- GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint.
- (majakabiljo via apresta)
+ GIRAPH-300) Improve netty reliability with retrying failed
+ connections, tracking requests, thread-safe hash partitioning
+ (aching via apresta).
+
+ GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in
+ checkpoint. (majakabiljo via apresta)
GIRAPH-297: Checkpointing on master is done one superstep later
(majakabiljo via aching).
@@ -18,7 +22,8 @@ Release 0.2.0 - unreleased
GIRAPH-218: Consolidate all I/O Format classes under one roof in
lib/ directory. (Eli Reisman via jghoman)
- GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
+ GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via
+ apresta)
GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP
by # of vertices results in wide variance in RPC message sizes. (Eli
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Aug 15 20:07:32 2012
@@ -914,7 +914,7 @@ under the License.
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.3.1.Final</version>
+ <version>3.5.3.Final</version>
</dependency>
</dependencies>
</project>
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Wed Aug 15 20:07:32 2012
@@ -18,16 +18,19 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -42,6 +45,7 @@ import org.jboss.netty.channel.ChannelPi
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
/**
* Netty client for sending requests.
@@ -67,21 +71,26 @@ public class NettyClient<I extends Writa
"giraph.maxNumberOfOpenRequests";
/** Default maximum number of requests without confirmation */
public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
-
+ /** Maximum number of requests to list (for debugging) */
+ public static final int MAX_REQUESTS_TO_LIST = 10;
+ /** 30 seconds to connect by default */
+ public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Context used to report progress */
private final Mapper<?, ?, ?, ?>.Context context;
/** Client bootstrap */
private final ClientBootstrap bootstrap;
- /** Atomic count of outstanding requests (synchronize on self) */
- private final AtomicInteger waitingRequestCount = new AtomicInteger(0);
/**
* Map of the peer connections, mapping from remote socket address to client
* meta data
*/
private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
Maps.newHashMap();
+ /** Atomic request id, used in outstandingRequestMap */
+ private final AtomicInteger requestId = new AtomicInteger(0);
+ /** Outstanding request map (tracks all requests). */
+ private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
/** Number of channels per server */
private final int channelsPerServer;
/** Byte counter for this client */
@@ -95,6 +104,10 @@ public class NettyClient<I extends Writa
private final boolean limitNumberOfOpenRequests;
/** Maximum number of requests without confirmation we can have */
private final int maxNumberOfOpenRequests;
+ /** Maximum number of connnection failures */
+ private final int maxConnectionFailures;
+ /** Timed logger for printing request debugging */
+ private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
/**
* Only constructor
@@ -127,13 +140,27 @@ public class NettyClient<I extends Writa
maxNumberOfOpenRequests = -1;
}
+ maxConnectionFailures = conf.getInt(
+ GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
+ GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+
+ int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+ outstandingRequestMap =
+ new MapMaker().concurrencyLevel(maxThreads).makeMap();
+
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
- conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
- NettyServer.DEFAULT_MAXIMUM_THREAD_POOL_SIZE)));
+ maxThreads));
+ bootstrap.setOption("connectTimeoutMillis",
+ MAX_CONNECTION_MILLISECONDS_DEFAULT);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("sendBufferSize", sendBufferSize);
+ bootstrap.setOption("receiveBufferSize", receiveBufferSize);
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -141,64 +168,121 @@ public class NettyClient<I extends Writa
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
byteCounter,
+ new FixedLengthFrameDecoder(9),
new RequestEncoder(),
- new ResponseClientHandler(waitingRequestCount));
+ new ResponseClientHandler(outstandingRequestMap));
}
});
}
/**
+ * Pair object for connectAllAddresses().
+ */
+ private static class ChannelFutureAddress {
+ /** Future object */
+ private final ChannelFuture future;
+ /** Address of the future */
+ private final InetSocketAddress address;
+
+ /**
+ * Constructor.
+ *
+ * @param future Immutable future
+ * @param address Immutable address
+ */
+ ChannelFutureAddress(ChannelFuture future, InetSocketAddress address) {
+ this.future = future;
+ this.address = address;
+ }
+ }
+
+ /**
* Connect to a collection of addresses
*
* @param addresses Addresses to connect to (if haven't already connected)
*/
- public void connectAllAddresses(Collection<InetSocketAddress> addresses) {
- List<ChannelFuture> waitingConnectionList =
- new ArrayList<ChannelFuture>();
+ public void connectAllAddresses(Set<InetSocketAddress> addresses) {
+ List<ChannelFutureAddress> waitingConnectionList =
+ Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
for (InetSocketAddress address : addresses) {
- if (address == null) {
+ context.progress();
+ if (address == null || address.getHostName() == null ||
+ address.getHostName().isEmpty()) {
throw new IllegalStateException("connectAllAddresses: Null address " +
"in addresses " + addresses);
}
+ if (address.isUnresolved()) {
+ throw new IllegalStateException("connectAllAddresses: Unresolved " +
+ "address " + address);
+ }
if (addressChannelMap.containsKey(address)) {
continue;
}
// Start connecting to the remote server up to n time
- ChannelRotater channelRotater = new ChannelRotater();
for (int i = 0; i < channelsPerServer; ++i) {
ChannelFuture connectionFuture = bootstrap.connect(address);
- connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
- connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
- connectionFuture.getChannel().getConfig().setOption(
- "sendBufferSize", sendBufferSize);
- connectionFuture.getChannel().getConfig().setOption(
- "receiveBufferSize", receiveBufferSize);
- channelRotater.addChannel(connectionFuture.getChannel());
- waitingConnectionList.add(connectionFuture);
+
+ waitingConnectionList.add(
+ new ChannelFutureAddress(connectionFuture, address));
}
- addressChannelMap.put(address, channelRotater);
}
- // Wait for all the connections to succeed
- for (ChannelFuture waitingConnection : waitingConnectionList) {
- ChannelFuture future =
- waitingConnection.awaitUninterruptibly();
- if (!future.isSuccess()) {
- throw new IllegalStateException("connectAllAddresses: Future failed " +
- "with " + future.getCause());
- }
- Channel channel = future.getChannel();
- if (LOG.isInfoEnabled()) {
- LOG.info("connectAllAddresses: Connected to " +
- channel.getRemoteAddress());
- }
+ // Wait for all the connections to succeed up to n tries
+ int failures = 0;
+ int connected = 0;
+ while (failures < maxConnectionFailures) {
+ List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
+ for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
+ context.progress();
+ ChannelFuture future =
+ waitingConnection.future.awaitUninterruptibly();
+ if (!future.isSuccess()) {
+ LOG.warn("connectAllAddresses: Future failed " +
+ "to connect with " + waitingConnection.address + " with " +
+ failures + " failures and because of " + future.getCause());
+
+ ChannelFuture connectionFuture =
+ bootstrap.connect(waitingConnection.address);
+ nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
+ waitingConnection.address));
+ ++failures;
+ } else {
+ Channel channel = future.getChannel();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("connectAllAddresses: Connected to " +
+ channel.getRemoteAddress());
+ }
+
+ if (channel.getRemoteAddress() == null) {
+ throw new IllegalStateException(
+ "connectAllAddresses: Null remote address!");
+ }
- if (channel.getRemoteAddress() == null) {
- throw new IllegalStateException("connectAllAddresses: Null remote " +
- "address!");
+ ChannelRotater rotater =
+ addressChannelMap.get(waitingConnection.address);
+ if (rotater == null) {
+ rotater = new ChannelRotater();
+ addressChannelMap.put(waitingConnection.address, rotater);
+ }
+ rotater.addChannel(future.getChannel());
+ ++connected;
+ }
+ }
+ LOG.info("connectAllAddresses: Successfully added " +
+ (waitingConnectionList.size() - nextCheckFutures.size()) +
+ " connections, (" + connected + " total connected) " +
+ nextCheckFutures.size() + " failed, " +
+ failures + " failures total.");
+ if (nextCheckFutures.isEmpty()) {
+ break;
}
+ waitingConnectionList = nextCheckFutures;
+ }
+ if (failures >= maxConnectionFailures) {
+ throw new IllegalStateException(
+ "connectAllAddresses: Too many failures (" + failures + ").");
}
}
@@ -206,7 +290,7 @@ public class NettyClient<I extends Writa
* Stop the client.
*/
public void stop() {
- // close connections asyncronously, in a Netty-approved
+ // Close connections asyncronously, in a Netty-approved
// way, without cleaning up thread pools until all channels
// in addressChannelMap are closed (success or failure)
int channelCount = 0;
@@ -243,18 +327,28 @@ public class NettyClient<I extends Writa
*/
public void sendWritableRequest(InetSocketAddress remoteServer,
WritableRequest<I, V, E, M> request) {
- if (waitingRequestCount.get() == 0) {
+ if (outstandingRequestMap.isEmpty()) {
byteCounter.resetAll();
}
- waitingRequestCount.incrementAndGet();
+ request.setRequestId(requestId.incrementAndGet());
Channel channel = addressChannelMap.get(remoteServer).nextChannel();
if (channel == null) {
throw new IllegalStateException(
"sendWritableRequest: No channel exists for " + remoteServer);
}
- channel.write(request);
+ RequestInfo newRequestInfo = new RequestInfo(remoteServer);
+ RequestInfo oldRequestInfo = outstandingRequestMap.putIfAbsent(
+ request.getRequestId(), newRequestInfo);
+ if (oldRequestInfo != null) {
+ throw new IllegalStateException("sendWritableRequest: Impossible to " +
+ "have a previous request id = " + request.getRequestId() + ", " +
+ "request info of " + oldRequestInfo);
+ }
+ ChannelFuture writeFuture = channel.write(request);
+ newRequestInfo.setWriteFuture(writeFuture);
+
if (limitNumberOfOpenRequests &&
- waitingRequestCount.get() > maxNumberOfOpenRequests) {
+ outstandingRequestMap.size() > maxNumberOfOpenRequests) {
waitSomeRequests(maxNumberOfOpenRequests);
}
}
@@ -279,16 +373,25 @@ public class NettyClient<I extends Writa
* complete
*/
private void waitSomeRequests(int maxOpenRequests) {
- synchronized (waitingRequestCount) {
- while (waitingRequestCount.get() > maxOpenRequests) {
- if (LOG.isInfoEnabled()) {
+ synchronized (outstandingRequestMap) {
+ while (outstandingRequestMap.size() > maxOpenRequests) {
+ if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
LOG.info("waitSomeRequests: Waiting interval of " +
- WAITING_REQUEST_MSECS + " msecs, " + waitingRequestCount +
+ WAITING_REQUEST_MSECS + " msecs, " +
+ outstandingRequestMap.size() +
" open requests, waiting for it to be <= " + maxOpenRequests +
", " + byteCounter.getMetrics());
+
+ if (outstandingRequestMap.size() < MAX_REQUESTS_TO_LIST) {
+ for (Map.Entry<Long, RequestInfo> entry :
+ outstandingRequestMap.entrySet()) {
+ LOG.info("waitSomeRequests: Waiting for request " +
+ entry.getKey() + " - " + entry.getValue());
+ }
+ }
}
try {
- waitingRequestCount.wait(WAITING_REQUEST_MSECS);
+ outstandingRequestMap.wait(WAITING_REQUEST_MSECS);
} catch (InterruptedException e) {
LOG.error("waitFutures: Got unexpected InterruptedException", e);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Wed Aug 15 20:07:32 2012
@@ -59,7 +59,9 @@ public class NettyServer<I extends Writa
V extends Writable, E extends Writable,
M extends Writable> {
/** Default maximum thread pool size */
- public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 32;
+ public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
+ /** Default TCP backlog */
+ public static final int TCP_BACKLOG_DEFAULT = 100;
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
@@ -125,8 +127,7 @@ public class NettyServer<I extends Writa
throw new IllegalStateException("NettyServer: unable to get hostname");
}
maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
- DEFAULT_MAXIMUM_THREAD_POOL_SIZE);
- Executors.newCachedThreadPool(workerFactory);
+ MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
channelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
@@ -140,6 +141,11 @@ public class NettyServer<I extends Writa
public void start() {
bootstrap = new ServerBootstrap(channelFactory);
// Set up the pipeline factory.
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.sendBufferSize", sendBufferSize);
+ bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
+ bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
@@ -171,8 +177,6 @@ public class NettyServer<I extends Writa
// preserving debugability from the port number alone.
// Round up the max number of workers to the next power of 10 and use
// it as a constant to increase the port number with.
- boolean tcpNoDelay = false;
- boolean keepAlive = false;
while (bindAttempts < maxRpcPortBindAttempts) {
this.myAddress = new InetSocketAddress(localHostname, bindPort);
if (failFirstPortBindingAttempt && bindAttempts == 0) {
@@ -189,10 +193,6 @@ public class NettyServer<I extends Writa
try {
Channel ch = bootstrap.bind(myAddress);
accepted.add(ch);
- tcpNoDelay = ch.getConfig().setOption("tcpNoDelay", true);
- keepAlive = ch.getConfig().setOption("keepAlive", true);
- ch.getConfig().setOption("sendBufferSize", sendBufferSize);
- ch.getConfig().setOption("receiveBufferSize", receiveBufferSize);
break;
} catch (ChannelException e) {
@@ -212,9 +212,9 @@ public class NettyServer<I extends Writa
LOG.info("start: Started server " +
"communication server: " + myAddress + " with up to " +
maximumPoolSize + " threads on bind attempt " + bindAttempts +
- " with tcpNoDelay = " + tcpNoDelay + " and keepAlive = " +
- keepAlive + " sendBufferSize = " + sendBufferSize +
- " receiveBufferSize = " + receiveBufferSize);
+ " with sendBufferSize = " + sendBufferSize +
+ " receiveBufferSize = " + receiveBufferSize + " backlog = " +
+ bootstrap.getOption("backlog"));
}
}
@@ -222,6 +222,9 @@ public class NettyServer<I extends Writa
* Stop the server.
*/
public void stop() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: Halting netty server");
+ }
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
bootstrap.releaseExternalResources();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
@@ -34,13 +36,11 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -115,8 +115,8 @@ public class NettyWorkerClient<I extends
public void fixPartitionIdToSocketAddrMap() {
// 1. Fix all the cached inet addresses (remove all changed entries)
// 2. Connect to any new RPC servers
- List<InetSocketAddress> addresses =
- Lists.newArrayListWithCapacity(service.getPartitionOwners().size());
+ Set<InetSocketAddress> addresses =
+ Sets.newHashSetWithExpectedSize(service.getPartitionOwners().size());
for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
InetSocketAddress address =
partitionIndexAddressMap.get(
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java?rev=1373609&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java Wed Aug 15 20:07:32 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+import org.jboss.netty.channel.ChannelFuture;
+
+/**
+ * Help track requests throughout the system
+ */
+public class RequestInfo {
+ /** Destination of the request */
+ private final InetSocketAddress destinationAddress;
+ /** When the request was started */
+ private final long startedMsecs;
+ /** Future of the write of this request*/
+ private volatile ChannelFuture writeFuture;
+
+ /**
+ * Constructor.
+ *
+ * @param destinationAddress Destination of the request
+ */
+ public RequestInfo(InetSocketAddress destinationAddress) {
+ this.destinationAddress = destinationAddress;
+ this.startedMsecs = System.currentTimeMillis();
+ }
+
+ public InetSocketAddress getAddress() {
+ return destinationAddress;
+ }
+
+ public long getStartedMsecs() {
+ return startedMsecs;
+ }
+
+ /**
+ * Get the elapsed time since the request started.
+ *
+ * @return Msecs since the request was started
+ */
+ public long getElapsedMsecs() {
+ return System.currentTimeMillis() - startedMsecs;
+ }
+
+ public void setWriteFuture(ChannelFuture writeFuture) {
+ this.writeFuture = writeFuture;
+ }
+
+ public ChannelFuture getWriteFuture() {
+ return writeFuture;
+ }
+
+ @Override
+ public String toString() {
+ return "(destAddr=" + destinationAddress +
+ ",startDate=" + new Date(startedMsecs) + ",elapsedMsecs=" +
+ getElapsedMsecs() +
+ ((writeFuture == null) ? ")" :
+ ",writeDone=" + writeFuture.isDone() +
+ ",writeSuccess=" + writeFuture.isSuccess() + ")");
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java Wed Aug 15 20:07:32 2012
@@ -24,6 +24,8 @@ import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -65,9 +67,34 @@ public class RequestServerHandler<I exte
(WritableRequest<I, V, E, M>) e.getMessage();
writableRequest.doRequest(serverData);
- // Send the success response
- ChannelBuffer buffer = ChannelBuffers.directBuffer(1);
+ // Send the success response with the request id
+ ChannelBuffer buffer = ChannelBuffers.directBuffer(9);
+ buffer.writeLong(writableRequest.getRequestId());
buffer.writeByte(0);
e.getChannel().write(buffer);
}
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("channelConnected: Connected the channel on " +
+ ctx.getChannel().getRemoteAddress());
+ }
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("channelClosed: Closed the channel on " +
+ ctx.getChannel().getRemoteAddress());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ throw new IllegalStateException("exceptionCaught: Channel failed with " +
+ "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java Wed Aug 15 20:07:32 2012
@@ -19,12 +19,13 @@
package org.apache.giraph.comm;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -36,16 +37,17 @@ public class ResponseClientHandler exten
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ResponseClientHandler.class);
- /** Keep track on the responses received */
- private final AtomicInteger waitingRequestCount;
+ /** Outstanding request map */
+ private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
/**
* Constructor.
*
- * @param waitingRequestCount Number of requests to wait for
+ * @param outstandingRequestMap Map of outstanding requests
*/
- public ResponseClientHandler(AtomicInteger waitingRequestCount) {
- this.waitingRequestCount = waitingRequestCount;
+ public ResponseClientHandler(
+ ConcurrentMap<Long, RequestInfo> outstandingRequestMap) {
+ this.outstandingRequestMap = outstandingRequestMap;
}
@Override
@@ -57,48 +59,52 @@ public class ResponseClientHandler exten
}
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+ long requestId = -1;
int response = -1;
try {
- for (int i = 0; i < buffer.capacity(); ++i) {
- try {
- response = inputStream.readByte();
- } catch (IOException e) {
- throw new IllegalStateException(
- "messageReceived: Got IOException ", e);
- }
- if (response != 0) {
- throw new IllegalStateException(
- "messageReceived: Got illegal response " + response);
- }
- }
- } finally {
- try {
- inputStream.close();
- } catch (IOException e) {
- throw new IllegalStateException("messageReceived: Got IOException ", e);
- }
+ requestId = inputStream.readLong();
+ response = inputStream.readByte();
+ inputStream.close();
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "messageReceived: Got IOException ", e);
+ }
+ if (response != 0) {
+ throw new IllegalStateException(
+ "messageReceived: Got illegal response " + response);
}
- synchronized (waitingRequestCount) {
- int currentRequestCount =
- waitingRequestCount.addAndGet(-1 * buffer.capacity());
- if (currentRequestCount < 0) {
- throw new IllegalStateException("messageReceived: Impossible to " +
- "have negative currentRequestCount " + currentRequestCount);
- } else if (currentRequestCount == 0) {
- waitingRequestCount.notify();
- }
+ RequestInfo requestInfo = outstandingRequestMap.remove(requestId);
+ if (requestInfo == null) {
+ throw new IllegalStateException("messageReceived: Impossible to " +
+ "have a non-registered requestId " + requestId);
+ } else {
if (LOG.isDebugEnabled()) {
- LOG.debug("messageReceived: currentRequestCount = " +
- currentRequestCount + ", bytes = " + buffer.capacity());
+ LOG.debug("messageReceived: Processed request id = " + requestId +
+ " " + requestInfo + ". Waiting on " +
+ outstandingRequestMap.size() +
+ " requests, bytes = " + buffer.capacity());
}
}
+
+ // Help NettyClient#waitSomeRequests() to finish faster
+ synchronized (outstandingRequestMap) {
+ outstandingRequestMap.notifyAll();
+ }
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("channelClosed: Closed the channel on " +
+ ctx.getChannel().getRemoteAddress());
+ }
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
throw new IllegalStateException("exceptionCaught: Channel failed with " +
- "remote address " + ctx.getChannel().getRemoteAddress() + " with " +
- "cause " + e.getCause());
+ "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java Wed Aug 15 20:07:32 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm;
import org.apache.giraph.comm.RequestRegistry.Type;
import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -47,7 +46,7 @@ import java.util.Map.Entry;
@SuppressWarnings("rawtypes")
public class SendPartitionMessagesRequest<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> implements WritableRequest<I, V, E, M> {
+ M extends Writable> extends WritableRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionMessagesRequest.class);
@@ -55,8 +54,6 @@ public class SendPartitionMessagesReques
private int partitionId;
/** Messages sent for a partition */
private Map<I, Collection<M>> vertexIdMessages;
- /** Configuration */
- private Configuration conf;
/**
* Constructor used for reflection only
@@ -76,17 +73,17 @@ public class SendPartitionMessagesReques
}
@Override
- public void readFields(DataInput input) throws IOException {
+ public void readFieldsRequest(DataInput input) throws IOException {
partitionId = input.readInt();
int vertexIdMessagesSize = input.readInt();
vertexIdMessages = Maps.newHashMapWithExpectedSize(vertexIdMessagesSize);
for (int i = 0; i < vertexIdMessagesSize; ++i) {
- I vertexId = BspUtils.<I>createVertexId(conf);
+ I vertexId = BspUtils.<I>createVertexId(getConf());
vertexId.readFields(input);
int messageCount = input.readInt();
List<M> messageList = Lists.newArrayListWithCapacity(messageCount);
for (int j = 0; j < messageCount; ++j) {
- M message = BspUtils.<M>createMessageValue(conf);
+ M message = BspUtils.<M>createMessageValue(getConf());
message.readFields(input);
messageList.add(message);
}
@@ -98,7 +95,7 @@ public class SendPartitionMessagesReques
}
@Override
- public void write(DataOutput output) throws IOException {
+ public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeInt(vertexIdMessages.size());
for (Entry<I, Collection<M>> entry : vertexIdMessages.entrySet()) {
@@ -125,16 +122,6 @@ public class SendPartitionMessagesReques
}
}
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
/**
* Get id of partition
*
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java Wed Aug 15 20:07:32 2012
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHa
import org.apache.giraph.comm.RequestRegistry.Type;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.VertexMutations;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -46,7 +45,7 @@ import com.google.common.collect.Maps;
@SuppressWarnings("rawtypes")
public class SendPartitionMutationsRequest<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> implements WritableRequest<I, V, E, M> {
+ M extends Writable> extends WritableRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionMutationsRequest.class);
@@ -54,8 +53,6 @@ public class SendPartitionMutationsReque
private int partitionId;
/** Mutations sent for a partition */
private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
- /** Configuration */
- private Configuration conf;
/**
* Constructor used for reflection only
@@ -68,23 +65,24 @@ public class SendPartitionMutationsReque
* @param partitionId Partition to send the request to
* @param vertexIdMutations Map of mutations to send
*/
- public SendPartitionMutationsRequest(int partitionId,
+ public SendPartitionMutationsRequest(
+ int partitionId,
Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
this.partitionId = partitionId;
this.vertexIdMutations = vertexIdMutations;
}
@Override
- public void readFields(DataInput input) throws IOException {
+ public void readFieldsRequest(DataInput input) throws IOException {
partitionId = input.readInt();
int vertexIdMutationsSize = input.readInt();
vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
for (int i = 0; i < vertexIdMutationsSize; ++i) {
- I vertexId = BspUtils.<I>createVertexId(conf);
+ I vertexId = BspUtils.<I>createVertexId(getConf());
vertexId.readFields(input);
VertexMutations<I, V, E, M> vertexMutations =
new VertexMutations<I, V, E, M>();
- vertexMutations.setConf(conf);
+ vertexMutations.setConf(getConf());
vertexMutations.readFields(input);
if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
throw new IllegalStateException(
@@ -94,7 +92,7 @@ public class SendPartitionMutationsReque
}
@Override
- public void write(DataOutput output) throws IOException {
+ public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeInt(vertexIdMutations.size());
for (Entry<I, VertexMutations<I, V, E, M>> entry :
@@ -129,14 +127,4 @@ public class SendPartitionMutationsReque
}
}
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Wed Aug 15 20:07:32 2012
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHa
import org.apache.giraph.comm.RequestRegistry.Type;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -45,7 +44,7 @@ import com.google.common.collect.Lists;
@SuppressWarnings("rawtypes")
public class SendVertexRequest<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> implements WritableRequest<I, V, E, M> {
+ M extends Writable> extends WritableRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendVertexRequest.class);
@@ -53,8 +52,6 @@ public class SendVertexRequest<I extends
private int partitionId;
/** List of vertices to be stored on this partition */
private Collection<Vertex<I, V, E, M>> vertices;
- /** Configuration */
- private Configuration conf;
/**
* Constructor used for reflection only
@@ -67,26 +64,26 @@ public class SendVertexRequest<I extends
* @param partitionId Partition to send the request to
* @param vertices Vertices to send
*/
- public SendVertexRequest(
- int partitionId, Collection<Vertex<I, V, E, M>> vertices) {
+ public SendVertexRequest(int partitionId,
+ Collection<Vertex<I, V, E, M>> vertices) {
this.partitionId = partitionId;
this.vertices = vertices;
}
@Override
- public void readFields(DataInput input) throws IOException {
+ public void readFieldsRequest(DataInput input) throws IOException {
partitionId = input.readInt();
int verticesCount = input.readInt();
vertices = Lists.newArrayListWithCapacity(verticesCount);
for (int i = 0; i < verticesCount; ++i) {
- Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ Vertex<I, V, E, M> vertex = BspUtils.createVertex(getConf());
vertex.readFields(input);
vertices.add(vertex);
}
}
@Override
- public void write(DataOutput output) throws IOException {
+ public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeInt(vertices.size());
for (Vertex<I, V, E, M> vertex : vertices) {
@@ -121,15 +118,5 @@ public class SendVertexRequest<I extends
vertexMap.addAll(vertices);
}
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Wed Aug 15 20:07:32 2012
@@ -18,8 +18,12 @@
package org.apache.giraph.comm;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import org.apache.giraph.comm.RequestRegistry.Type;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -32,19 +36,69 @@ import org.apache.hadoop.io.WritableComp
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface WritableRequest<I extends WritableComparable,
+public abstract class WritableRequest<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> extends Writable, Configurable {
+ M extends Writable> implements Writable, Configurable {
+ /** Configuration */
+ private Configuration conf;
+ /** Request id */
+ private long requestId = -1;
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(long requestId) {
+ this.requestId = requestId;
+ }
+
/**
* Get the type of the request
*
* @return Request type
*/
- Type getType();
+ public abstract Type getType();
+
+ /**
+ * Serialize the request
+ *
+ * @param input Input to read fields from
+ */
+ abstract void readFieldsRequest(DataInput input) throws IOException;
+
+ /**
+ * Deserialize the request
+ *
+ * @param output Output to write the request to
+ */
+ abstract void writeRequest(DataOutput output) throws IOException;
+
/**
* Execute the request
*
* @param serverData Accessible data that can be mutated per the request
*/
- void doRequest(ServerData<I, V, E, M> serverData);
+ public abstract void doRequest(ServerData<I, V, E, M> serverData);
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public final void readFields(DataInput input) throws IOException {
+ requestId = input.readLong();
+ readFieldsRequest(input);
+ }
+
+ @Override
+ public final void write(DataOutput output) throws IOException {
+ output.writeLong(requestId);
+ writeRequest(output);
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import com.google.common.collect.Sets;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -89,6 +90,8 @@ public class BspServiceMaster<I extends
implements CentralizedServiceMaster<I, V, E, M> {
/** Counter group name for the Giraph statistics */
public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+ /** Print worker names only if there are 10 workers left */
+ public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
/** Superstep counter */
@@ -344,8 +347,7 @@ public class BspServiceMaster<I extends
} catch (KeeperException e) {
throw new IllegalStateException("getWorkers: KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException"
- , e);
+ throw new IllegalStateException("getWorkers: IllegalStateException", e);
}
try {
@@ -357,8 +359,7 @@ public class BspServiceMaster<I extends
} catch (KeeperException e) {
throw new IllegalStateException("getWorkers: KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException"
- , e);
+ throw new IllegalStateException("getWorkers: IllegalStateException", e);
}
List<WorkerInfo> currentHealthyWorkerInfoList =
@@ -1339,6 +1340,12 @@ public class BspServiceMaster<I extends
" out of " + workerInfoList.size() +
" workers finished on superstep " +
getSuperstep() + " on path " + finishedWorkerPath);
+ if (workerInfoList.size() - finishedHostnameIdList.size() <
+ MAX_PRINTABLE_REMAINING_WORKERS) {
+ Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
+ remainingWorkers.removeAll(finishedHostnameIdList);
+ LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
+ }
}
getContext().setStatus(getGraphMapper().getMapFunctions() + " - " +
finishedHostnameIdList.size() +
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed Aug 15 20:07:32 2012
@@ -159,6 +159,12 @@ public class GiraphJob {
/** Default is to use RPC, not netty */
public static final boolean USE_NETTY_DEFAULT = false;
+ /** Netty max connection failures */
+ public static final String NETTY_MAX_CONNECTION_FAILURES =
+ "giraph.nettyMaxConnectionFailures";
+ /** Default Netty max connection failures */
+ public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+
/** Initial port to start using for the RPC communication */
public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
/** Default port to start using for the RPC communication */
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Wed Aug 15 20:07:32 2012
@@ -53,8 +53,10 @@ public class HashWorkerPartitioner<I ext
@Override
public PartitionOwner getPartitionOwner(I vertexId) {
- return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
- partitionOwnerList.size());
+ synchronized (partitionOwnerList) {
+ return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+ partitionOwnerList.size());
+ }
}
@Override
@@ -70,8 +72,10 @@ public class HashWorkerPartitioner<I ext
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
Map<Integer, Partition<I, V, E, M>> partitionMap) {
- partitionOwnerList.clear();
- partitionOwnerList.addAll(masterSetPartitionOwners);
+ synchronized (partitionOwnerList) {
+ partitionOwnerList.clear();
+ partitionOwnerList.addAll(masterSetPartitionOwners);
+ }
Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
@@ -111,6 +115,8 @@ public class HashWorkerPartitioner<I ext
@Override
public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
+ synchronized (partitionOwnerList) {
+ return partitionOwnerList;
+ }
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java Wed Aug 15 20:07:32 2012
@@ -48,9 +48,22 @@ public class TimedLogger {
* @param msg Message to print
*/
public void info(String msg) {
- if (System.currentTimeMillis() > lastPrint + msecs) {
+ if (isPrintable()) {
log.info(msg);
+ }
+ }
+
+ /**
+ * Is the log message printable (minimum interval met)?
+ *
+ * @return True if the message is printable
+ */
+ public boolean isPrintable() {
+ if (System.currentTimeMillis() > lastPrint + msecs) {
lastPrint = System.currentTimeMillis();
+ return true;
}
+
+ return false;
}
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.giraph.comm.messages.SimpleMessageStore;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.utils.MockUtils;
@@ -104,8 +106,10 @@ public class ConnectionTest {
NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
new NettyClient<IntWritable, IntWritable, IntWritable,
IntWritable>(context);
- List<InetSocketAddress> serverAddresses =
- new ArrayList<InetSocketAddress>();
+ Set<InetSocketAddress> serverAddresses = Sets.newHashSet();
+ serverAddresses.add(server1.getMyAddress());
+ serverAddresses.add(server2.getMyAddress());
+ serverAddresses.add(server3.getMyAddress());
client.connectAllAddresses(serverAddresses);
client.stop();