You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/22 00:17:57 UTC
svn commit: r1375824 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/
src/test/java/org/apache/giraph/comm/
Author: aching
Date: Tue Aug 21 22:17:56 2012
New Revision: 1375824
URL: http://svn.apache.org/viewvc?rev=1375824&view=rev
Log:
GIRAPH-306: Netty requests should be reliable and implement exactly
once semantics. (aching)
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java
giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java
giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
- copied, changed from r1375717, giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 21 22:17:56 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-306: Netty requests should be reliable and implement exactly
+ once semantics. (aching)
+
GIRAPH-309: Message count is wrong. (aching via apresta)
GIRAPH-246: Periodic worker calls to context.progress() will prevent
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * Generate different request ids based on the address of the well known
+ * port on the workers
+ */
+public class AddressRequestIdGenerator {
+ /** Address request generator map */
+ private final Map<InetSocketAddress, Long> addressRequestGeneratorMap =
+ Maps.newHashMap();
+
+ /**
+ * Get the next request id for a given destination. Not thread-safe.
+ *
+ * @param address Address of the worker (consistent during a superstep)
+ * @return Valid request id
+ */
+ public Long getNextRequestId(InetSocketAddress address) {
+ Long requestGenerator = addressRequestGeneratorMap.get(address);
+ if (requestGenerator == null) {
+ requestGenerator = Long.valueOf(0);
+ if (addressRequestGeneratorMap.put(address, requestGenerator) != null) {
+ throw new IllegalStateException("getNextRequestId: Illegal put for " +
+ "address " + address);
+ }
+ return requestGenerator;
+ }
+
+ requestGenerator = requestGenerator + 1;
+ addressRequestGeneratorMap.put(address, requestGenerator);
+ return requestGenerator;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java Tue Aug 21 22:17:56 2012
@@ -51,11 +51,29 @@ public class ChannelRotater {
throw new IllegalArgumentException("nextChannel: No channels exist!");
}
+ incrementIndex();
+ return channelList.get(index);
+ }
+
+ /**
+ * Remove the last channel that was given out
+ *
+ * @return Return the removed channel
+ */
+ public Channel removeLast() {
+ Channel channel = channelList.remove(index);
+ incrementIndex();
+ return channel;
+ }
+
+ /**
+ * Increment the channel index with wrapping
+ */
+ private void incrementIndex() {
++index;
if (index >= channelList.size()) {
index = 0;
}
- return channelList.get(index);
}
/**
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+/**
+ * Simple immutable object to use for tracking requests uniquely. This
+ * object is guaranteed to be unique for a given client (based on the
+ * destination worker and the request).
+ */
+public class ClientRequestId {
+ /** Destination worker id */
+ private final int destinationWorkerId;
+ /** Request id */
+ private final long requestId;
+
+ /**
+ * Constructor.
+ *
+ * @param destinationWorkerId Destination worker id
+ * @param requestId Request id
+ */
+ public ClientRequestId(int destinationWorkerId, long requestId) {
+ this.destinationWorkerId = destinationWorkerId;
+ this.requestId = requestId;
+ }
+
+ public int getDestinationWorkerId() {
+ return destinationWorkerId;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (29 * destinationWorkerId) + (int) (57 * requestId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof ClientRequestId) {
+ ClientRequestId otherObj = (ClientRequestId) other;
+ if (otherObj.getRequestId() == requestId &&
+ otherObj.getDestinationWorkerId() == destinationWorkerId) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "(destWorker=" + destinationWorkerId + ",reqId=" + requestId + ")";
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.BitSet;
+
+/**
+ * Bit set optimized for increasing longs to save storage space.
+ * The general idea is that even though some keys will be added out-of-order,
+ * there is a base key that keeps increasing so that the bit set doesn't get
+ * very big. When there are enough set bits, the bit set gets compacted.
+ * Thread-safe.
+ */
+public class IncreasingBitSet {
+ /** Minimum number of bits to shift */
+ public static final int MIN_BITS_TO_SHIFT = 64 * 1024;
+ /** Bit set used */
+ private BitSet bitSet = new BitSet();
+ /** Last base key (all keys < this have been accepted */
+ private long lastBaseKey = 0;
+
+ /**
+ * Add a key if it is possible.
+ *
+ * @param key Key to add
+ * @return True if the key was added, false otherwise
+ */
+ public synchronized boolean add(long key) {
+ long remainder = key - lastBaseKey;
+ checkLegalKey(remainder);
+
+ if (remainder < 0) {
+ return false;
+ }
+ if (bitSet.get((int) remainder)) {
+ return false;
+ }
+ bitSet.set((int) remainder);
+ int nextClearBit = bitSet.nextClearBit(0);
+ if (nextClearBit >= MIN_BITS_TO_SHIFT) {
+ bitSet = bitSet.get(nextClearBit,
+ Math.max(nextClearBit, bitSet.length()));
+ lastBaseKey += nextClearBit;
+ }
+ return true;
+ }
+
+ /**
+ * Get the number of set bits
+ *
+ * @return Number of set bits
+ */
+ public synchronized long cardinality() {
+ long size = bitSet.cardinality();
+ return size + lastBaseKey;
+ }
+
+ /**
+ * Get the size of the bit set
+ *
+ * @return Size of the bit set
+ */
+ public synchronized int size() {
+ return bitSet.size();
+ }
+
+ /**
+ * Check for existence of a key
+ *
+ * @param key Key to check for
+ * @return True if the key exists, false otherwise
+ */
+ public synchronized boolean has(long key) {
+ long remainder = key - lastBaseKey;
+ checkLegalKey(remainder);
+
+ if (remainder < 0) {
+ return true;
+ }
+ return bitSet.get((int) remainder);
+ }
+
+ /**
+ * Get the last base key (mainly for debugging).
+ *
+ * @return Last base key
+ */
+ public synchronized long getLastBaseKey() {
+ return lastBaseKey;
+ }
+
+ /**
+ * Check the remainder for validity
+ *
+ * @param remainder Remainder to check
+ */
+ private void checkLegalKey(long remainder) {
+ if (remainder > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ "checkLegalKey: Impossible that to add key " +
+ (remainder + lastBaseKey) + " with base " +
+ lastBaseKey + " since the " +
+ "spread is too large (> " + Integer.MAX_VALUE);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug 21 22:17:56 2012
@@ -21,11 +21,13 @@ package org.apache.giraph.comm;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,8 +61,6 @@ import org.jboss.netty.handler.codec.fra
public class NettyClient<I extends WritableComparable,
V extends Writable, E extends Writable,
M extends Writable> {
- /** Msecs to wait between waiting for all requests to finish */
- public static final int WAITING_REQUEST_MSECS = 15000;
/** Do we have a limit on number of open requests we can have */
public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
"giraph.waitForRequestsConfirmation";
@@ -87,10 +87,11 @@ public class NettyClient<I extends Writa
*/
private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
Maps.newHashMap();
- /** Atomic request id, used in outstandingRequestMap */
- private final AtomicInteger requestId = new AtomicInteger(0);
- /** Outstanding request map (tracks all requests). */
- private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
+ /**
+ * Request map of client request ids to request information.
+ */
+ private final ConcurrentMap<ClientRequestId, RequestInfo>
+ clientRequestIdRequestInfoMap;
/** Number of channels per server */
private final int channelsPerServer;
/** Byte counter for this client */
@@ -99,15 +100,29 @@ public class NettyClient<I extends Writa
private final int sendBufferSize;
/** Receive buffer size */
private final int receiveBufferSize;
-
/** Do we have a limit on number of open requests */
private final boolean limitNumberOfOpenRequests;
/** Maximum number of requests without confirmation we can have */
private final int maxNumberOfOpenRequests;
/** Maximum number of connnection failures */
private final int maxConnectionFailures;
+ /** Maximum number of milliseconds for a request */
+ private final int maxRequestMilliseconds;
+ /** Maximum number of reconnection failures */
+ private final int maxReconnectionFailures;
+ /** Waiting internal for checking outstanding requests msecs */
+ private final int waitingRequestMsecs;
/** Timed logger for printing request debugging */
private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
+ /** Boss factory service */
+ private final ExecutorService bossExecutorService;
+ /** Worker factory service */
+ private final ExecutorService workerExecutorService;
+ /** Address request id generator */
+ private final AddressRequestIdGenerator addressRequestIdGenerator =
+ new AddressRequestIdGenerator();
+ /** Client id */
+ private final int clientId;
/**
* Only constructor
@@ -116,7 +131,7 @@ public class NettyClient<I extends Writa
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
- Configuration conf = context.getConfiguration();
+ final Configuration conf = context.getConfiguration();
this.channelsPerServer = conf.getInt(
GiraphJob.CHANNELS_PER_SERVER,
GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
@@ -140,20 +155,41 @@ public class NettyClient<I extends Writa
maxNumberOfOpenRequests = -1;
}
+ maxRequestMilliseconds = conf.getInt(
+ GiraphJob.MAX_REQUEST_MILLISECONDS,
+ GiraphJob.MAX_REQUEST_MILLISECONDS_DEFAULT);
+
maxConnectionFailures = conf.getInt(
GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+ maxReconnectionFailures = conf.getInt(
+ GiraphJob.MAX_RECONNECT_ATTEMPTS,
+ GiraphJob.MAX_RECONNECT_ATTEMPTS_DEFAULT);
+
+ waitingRequestMsecs = conf.getInt(
+ GiraphJob.WAITING_REQUEST_MSECS,
+ GiraphJob.WAITING_REQUEST_MSECS_DEFAULT);
+
int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
- outstandingRequestMap =
+ clientRequestIdRequestInfoMap =
new MapMaker().concurrencyLevel(maxThreads).makeMap();
+ bossExecutorService = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat(
+ "Giraph Client Netty Boss #%d").build());
+ workerExecutorService = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat(
+ "Giraph Client Netty Worker #%d").build());
+
+ clientId = conf.getInt("mapred.task.partition", -1);
+
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool(),
+ bossExecutorService,
+ workerExecutorService,
maxThreads));
bootstrap.setOption("connectTimeoutMillis",
MAX_CONNECTION_MILLISECONDS_DEFAULT);
@@ -168,9 +204,9 @@ public class NettyClient<I extends Writa
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
byteCounter,
- new FixedLengthFrameDecoder(9),
+ new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES),
new RequestEncoder(),
- new ResponseClientHandler(outstandingRequestMap));
+ new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
}
});
}
@@ -241,7 +277,7 @@ public class NettyClient<I extends Writa
if (!future.isSuccess()) {
LOG.warn("connectAllAddresses: Future failed " +
"to connect with " + waitingConnection.address + " with " +
- failures + " failures and because of " + future.getCause());
+ failures + " failures because of " + future.getCause());
ChannelFuture connectionFuture =
bootstrap.connect(waitingConnection.address);
@@ -250,9 +286,9 @@ public class NettyClient<I extends Writa
++failures;
} else {
Channel channel = future.getChannel();
- if (LOG.isInfoEnabled()) {
- LOG.info("connectAllAddresses: Connected to " +
- channel.getRemoteAddress());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("connectAllAddresses: Connected to " +
+ channel.getRemoteAddress() + ", open = " + channel.isOpen());
}
if (channel.getRemoteAddress() == null) {
@@ -290,7 +326,7 @@ public class NettyClient<I extends Writa
* Stop the client.
*/
public void stop() {
- // Close connections asyncronously, in a Netty-approved
+ // Close connections asynchronously, in a Netty-approved
// way, without cleaning up thread pools until all channels
// in addressChannelMap are closed (success or failure)
int channelCount = 0;
@@ -305,12 +341,15 @@ public class NettyClient<I extends Writa
result.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) {
+ context.progress();
if (count.incrementAndGet() == done) {
if (LOG.isInfoEnabled()) {
LOG.info("stop: reached wait threshold, " +
done + " connections closed, releasing " +
"NettyClient.bootstrap resources now.");
}
+ bossExecutorService.shutdownNow();
+ workerExecutorService.shutdownNow();
bootstrap.releaseExternalResources();
}
}
@@ -320,25 +359,81 @@ public class NettyClient<I extends Writa
}
/**
+ * Get the next available channel, reconnecting if necessary
+ *
+ * @param remoteServer Remote server to get a channel for
+ * @return Available channel for this remote server
+ */
+ private Channel getNextChannel(InetSocketAddress remoteServer) {
+ Channel channel = addressChannelMap.get(remoteServer).nextChannel();
+ if (channel == null) {
+ throw new IllegalStateException(
+ "getNextChannel: No channel exists for " + remoteServer);
+ }
+
+ // Return this channel if it is connected
+ if (channel.isConnected()) {
+ return channel;
+ }
+
+ // Get rid of the failed channel
+ addressChannelMap.get(remoteServer).removeLast();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkAndFixChannel: Fixing disconnected channel to " +
+ remoteServer + ", open = " + channel.isOpen() + ", " +
+ "bound = " + channel.isBound());
+ }
+ int reconnectFailures = 0;
+ while (reconnectFailures < maxConnectionFailures) {
+ ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
+ connectionFuture.awaitUninterruptibly();
+ if (connectionFuture.isSuccess()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkAndFixChannel: Connected to " + remoteServer + "!");
+ }
+ addressChannelMap.get(remoteServer).addChannel(
+ connectionFuture.getChannel());
+ return connectionFuture.getChannel();
+ }
+ ++reconnectFailures;
+ LOG.warn("checkAndFixChannel: Failed to reconnect to " + remoteServer +
+ " on attempt " + reconnectFailures + " out of " +
+ maxConnectionFailures + " max attempts, sleeping for 5 secs",
+ connectionFuture.getCause());
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("blockingConnect: Unexpected interrupted exception", e);
+ }
+ }
+ throw new IllegalStateException("checkAndFixChannel: Failed to connect " +
+ "to " + remoteServer + " in " + reconnectFailures +
+ " connect attempts");
+ }
+
+ /**
* Send a request to a remote server (should be already connected)
*
+ * @param destWorkerId Destination worker id
* @param remoteServer Server to send the request to
* @param request Request to send
*/
- public void sendWritableRequest(InetSocketAddress remoteServer,
+ public void sendWritableRequest(Integer destWorkerId,
+ InetSocketAddress remoteServer,
WritableRequest<I, V, E, M> request) {
- if (outstandingRequestMap.isEmpty()) {
+ if (clientRequestIdRequestInfoMap.isEmpty()) {
byteCounter.resetAll();
}
- request.setRequestId(requestId.incrementAndGet());
- Channel channel = addressChannelMap.get(remoteServer).nextChannel();
- if (channel == null) {
- throw new IllegalStateException(
- "sendWritableRequest: No channel exists for " + remoteServer);
- }
- RequestInfo newRequestInfo = new RequestInfo(remoteServer);
- RequestInfo oldRequestInfo = outstandingRequestMap.putIfAbsent(
- request.getRequestId(), newRequestInfo);
+
+ Channel channel = getNextChannel(remoteServer);
+ request.setClientId(clientId);
+ request.setRequestId(
+ addressRequestIdGenerator.getNextRequestId(remoteServer));
+
+ RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
+ RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+ new ClientRequestId(destWorkerId, request.getRequestId()),
+ newRequestInfo);
if (oldRequestInfo != null) {
throw new IllegalStateException("sendWritableRequest: Impossible to " +
"have a previous request id = " + request.getRequestId() + ", " +
@@ -348,7 +443,7 @@ public class NettyClient<I extends Writa
newRequestInfo.setWriteFuture(writeFuture);
if (limitNumberOfOpenRequests &&
- outstandingRequestMap.size() > maxNumberOfOpenRequests) {
+ clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
waitSomeRequests(maxNumberOfOpenRequests);
}
}
@@ -367,37 +462,91 @@ public class NettyClient<I extends Writa
}
/**
- * Ensure that at most maxOpenRequests are not complete
+ * Ensure that at most maxOpenRequests are not complete. Periodically,
+ * check the state of every request. If we find the connection failed,
+ * re-establish it and re-send the request.
*
* @param maxOpenRequests Maximum number of requests which can be not
* complete
*/
private void waitSomeRequests(int maxOpenRequests) {
- synchronized (outstandingRequestMap) {
- while (outstandingRequestMap.size() > maxOpenRequests) {
- if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
- LOG.info("waitSomeRequests: Waiting interval of " +
- WAITING_REQUEST_MSECS + " msecs, " +
- outstandingRequestMap.size() +
- " open requests, waiting for it to be <= " + maxOpenRequests +
- ", " + byteCounter.getMetrics());
-
- if (outstandingRequestMap.size() < MAX_REQUESTS_TO_LIST) {
- for (Map.Entry<Long, RequestInfo> entry :
- outstandingRequestMap.entrySet()) {
- LOG.info("waitSomeRequests: Waiting for request " +
- entry.getKey() + " - " + entry.getValue());
- }
+ List<ClientRequestId> addedRequestIds = Lists.newArrayList();
+ List<RequestInfo<I, V, E, M>> addedRequestInfos =
+ Lists.newArrayList();
+
+ while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
+ // Wait for requests to complete for some time
+ if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
+ LOG.info("waitSomeRequests: Waiting interval of " +
+ waitingRequestMsecs + " msecs, " +
+ clientRequestIdRequestInfoMap.size() +
+ " open requests, waiting for it to be <= " + maxOpenRequests +
+ ", " + byteCounter.getMetrics());
+
+ if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+ for (Map.Entry<ClientRequestId, RequestInfo> entry :
+ clientRequestIdRequestInfoMap.entrySet()) {
+ LOG.info("waitSomeRequests: Waiting for request " +
+ entry.getKey() + " - " + entry.getValue());
}
}
+ }
+ synchronized (clientRequestIdRequestInfoMap) {
try {
- outstandingRequestMap.wait(WAITING_REQUEST_MSECS);
+ clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
} catch (InterruptedException e) {
LOG.error("waitFutures: Got unexpected InterruptedException", e);
}
- // Make sure that waiting doesn't kill the job
- context.progress();
}
+ // Make sure that waiting doesn't kill the job
+ context.progress();
+
+ // Check all the requests for problems
+ for (Map.Entry<ClientRequestId, RequestInfo> entry :
+ clientRequestIdRequestInfoMap.entrySet()) {
+ RequestInfo requestInfo = entry.getValue();
+ ChannelFuture writeFuture = requestInfo.getWriteFuture();
+ // If not connected anymore, request failed, or the request is taking
+ // too long, re-establish and resend
+ if (!writeFuture.getChannel().isConnected() ||
+ (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+ (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+ LOG.warn("waitSomeRequests: Problem with request id " +
+ entry.getKey() + " connected = " +
+ writeFuture.getChannel().isConnected() +
+ ", future done = " + writeFuture.isDone() + ", " +
+ "success = " + writeFuture.isSuccess() + ", " +
+ "cause = " + writeFuture.getCause() + ", " +
+ "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
+ "destination = " + writeFuture.getChannel().getRemoteAddress() +
+ " " + requestInfo);
+ addedRequestIds.add(entry.getKey());
+ addedRequestInfos.add(new RequestInfo<I, V, E, M>(
+ requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+ }
+ }
+
+ // Add any new requests to the system, connect if necessary, and re-send
+ for (int i = 0; i < addedRequestIds.size(); ++i) {
+ ClientRequestId requestId = addedRequestIds.get(i);
+ RequestInfo<I, V, E, M> requestInfo = addedRequestInfos.get(i);
+
+ if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
+ null) {
+ LOG.warn("waitSomeRequests: Request " + requestId +
+ " completed prior to sending the next request");
+ clientRequestIdRequestInfoMap.remove(requestId);
+ }
+ InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
+ Channel channel = getNextChannel(remoteServer);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("waitSomeRequests: Re-issuing request " + requestInfo);
+ }
+ ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+ requestInfo.setWriteFuture(writeFuture);
+ }
+ addedRequestIds.clear();
+ addedRequestInfos.clear();
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug 21 22:17:56 2012
@@ -21,10 +21,9 @@ package org.apache.giraph.comm;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
import org.apache.giraph.graph.GiraphJob;
@@ -60,8 +59,6 @@ public class NettyServer<I extends Writa
M extends Writable> {
/** Default maximum thread pool size */
public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
- /** Default TCP backlog */
- public static final int TCP_BACKLOG_DEFAULT = 100;
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
@@ -78,6 +75,8 @@ public class NettyServer<I extends Writa
private InetSocketAddress myAddress;
/** Maximum number of threads */
private final int maximumPoolSize;
+ /** TCP backlog */
+ private final int tcpBacklog;
/** Request reqistry */
private final RequestRegistry requestRegistry = new RequestRegistry();
/** Server data */
@@ -90,6 +89,12 @@ public class NettyServer<I extends Writa
private final int sendBufferSize;
/** Receive buffer size */
private final int receiveBufferSize;
+ /** Boss factory service */
+ private final ExecutorService bossExecutorService;
+ /** Worker factory service */
+ private final ExecutorService workerExecutorService;
+ /** Request completed map per worker */
+ private final WorkerRequestReservedMap workerRequestReservedMap;
/**
* Constructor for creating the server
@@ -115,12 +120,15 @@ public class NettyServer<I extends Writa
receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("Giraph Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("Giraph Netty Worker #%d")
- .build();
+ workerRequestReservedMap = new WorkerRequestReservedMap(conf);
+
+ bossExecutorService = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat(
+ "Giraph Server Netty Boss #%d").build());
+ workerExecutorService = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat(
+ "Giraph Server Netty Worker #%d").build());
+
try {
this.localHostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
@@ -129,9 +137,12 @@ public class NettyServer<I extends Writa
maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+ tcpBacklog = conf.getInt(GiraphJob.TCP_BACKLOG,
+ conf.getInt(GiraphJob.MAX_WORKERS, GiraphJob.TCP_BACKLOG_DEFAULT));
+
channelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
+ bossExecutorService,
+ workerExecutorService,
maximumPoolSize);
}
@@ -145,7 +156,7 @@ public class NettyServer<I extends Writa
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.sendBufferSize", sendBufferSize);
bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
- bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT);
+ bootstrap.setOption("backlog", tcpBacklog);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
@@ -153,7 +164,8 @@ public class NettyServer<I extends Writa
byteCounter,
new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter),
- new RequestServerHandler<I, V, E, M>(serverData));
+ new RequestServerHandler<I, V, E, M>(serverData,
+ workerRequestReservedMap, conf));
}
});
@@ -225,7 +237,9 @@ public class NettyServer<I extends Writa
if (LOG.isInfoEnabled()) {
LOG.info("stop: Halting netty server");
}
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ accepted.close().awaitUninterruptibly();
+ bossExecutorService.shutdownNow();
+ workerExecutorService.shutdownNow();
bootstrap.releaseExternalResources();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Tue Aug 21 22:17:56 2012
@@ -84,6 +84,8 @@ public class NettyWorkerClient<I extends
private final int maxMessagesPerPartition;
/** Maximum number of mutations per partition before sending */
private final int maxMutationsPerPartition;
+ /** Maximum number of attempts to resolve an address*/
+ private final int maxResolveAddressAttempts;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
/** Server data from the server */
@@ -106,6 +108,9 @@ public class NettyWorkerClient<I extends
GiraphJob.MSG_SIZE_DEFAULT);
maxMutationsPerPartition = conf.getInt(GiraphJob.MAX_MUTATIONS_PER_REQUEST,
GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+ maxResolveAddressAttempts = conf.getInt(
+ GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+ GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
sendMessageCache = new SendMessageCache<I, M>(conf);
sendMutationsCache = new SendMutationsCache<I, V, E, M>();
this.serverData = serverData;
@@ -135,7 +140,13 @@ public class NettyWorkerClient<I extends
partitionIndexAddressMap.remove(
partitionOwner.getPartitionId());
}
- addresses.add(partitionOwner.getWorkerInfo().getHostnamePort());
+
+ // No need to connect to myself
+ if (service.getWorkerInfo().getPartitionId() !=
+ partitionOwner.getWorkerInfo().getPartitionId()) {
+ addresses.add(getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId()));
+ }
}
nettyClient.connectAllAddresses(addresses);
}
@@ -152,7 +163,25 @@ public class NettyWorkerClient<I extends
InetSocketAddress address =
partitionIndexAddressMap.get(partitionId);
if (address == null) {
- address = workerInfo.getHostnamePort();
+ address = workerInfo.getInetSocketAddress();
+ int resolveAttempts = 0;
+ while (address.isUnresolved() &&
+ resolveAttempts < maxResolveAddressAttempts) {
+ address = workerInfo.getInetSocketAddress();
+ ++resolveAttempts;
+ LOG.warn("getInetSocketAddress: Failed to resolve " + address +
+ " on attempt " + resolveAttempts + " of " +
+ maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("getInetSocketAddress: Interrupted.", e);
+ }
+ }
+ if (resolveAttempts >= maxResolveAddressAttempts) {
+ throw new IllegalStateException("getInetSocketAddress: Coudldn't " +
+ "resolve " + address + " in " + resolveAttempts + " tries.");
+ }
partitionIndexAddressMap.put(partitionId, address);
}
@@ -160,32 +189,22 @@ public class NettyWorkerClient<I extends
}
/**
- * Fill the socket address cache for the partition owner.
- *
- * @param destVertex vertex to be sent
- * @return address of the vertex range server containing this vertex
- */
- private InetSocketAddress getInetSocketAddress(I destVertex) {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(destVertex);
- return getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId());
- }
-
- /**
* When doing the request, short circuit if it is local
*
+ * @param workerInfo Worker info
* @param remoteServerAddress Remote server address (checked against local)
* @param writableRequest Request to either submit or run locally
*/
- private void doRequest(InetSocketAddress remoteServerAddress,
+ private void doRequest(WorkerInfo workerInfo,
+ InetSocketAddress remoteServerAddress,
WritableRequest<I, V, E, M> writableRequest) {
// If this is local, execute locally
- if (service.getWorkerInfo().getHostnamePort().equals(
- remoteServerAddress)) {
+ if (service.getWorkerInfo().getPartitionId() ==
+ workerInfo.getPartitionId()) {
writableRequest.doRequest(serverData);
} else {
- nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ nettyClient.sendWritableRequest(
+ workerInfo.getPartitionId(), remoteServerAddress, writableRequest);
}
}
@@ -213,7 +232,8 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMessagesRequest<I, V, E, M>(
partitionId, partitionMessages);
- doRequest(remoteServerAddress, writableRequest);
+ doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+ writableRequest);
}
}
@@ -231,9 +251,9 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> vertexRequest =
new SendVertexRequest<I, V, E, M>(partitionId,
partition.getVertices());
- doRequest(remoteServerAddress, vertexRequest);
+ doRequest(workerInfo, remoteServerAddress, vertexRequest);
- // messages are stored separately
+ // Messages are stored separately
MessageStoreByPartition<I, M> messageStore =
service.getServerData().getCurrentMessageStore();
Map<I, Collection<M>> map = Maps.newHashMap();
@@ -251,7 +271,7 @@ public class NettyWorkerClient<I extends
if (messagesInMap > maxMessagesPerPartition) {
WritableRequest<I, V, E, M> messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(remoteServerAddress, messagesRequest);
+ doRequest(workerInfo, remoteServerAddress, messagesRequest);
map.clear();
messagesInMap = 0;
}
@@ -259,7 +279,7 @@ public class NettyWorkerClient<I extends
if (!map.isEmpty()) {
WritableRequest<I, V, E, M> messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(remoteServerAddress, messagesRequest);
+ doRequest(workerInfo, remoteServerAddress, messagesRequest);
}
}
@@ -283,7 +303,8 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
partitionId, partitionMutations);
- doRequest(remoteServerAddress, writableRequest);
+ doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+ writableRequest);
}
}
@@ -372,9 +393,14 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMessagesRequest<I, V, E, M>(
entry.getKey(), entry.getValue());
+ PartitionOwner partitionOwner =
+ service.getVertexPartitionOwner(
+ entry.getValue().keySet().iterator().next());
InetSocketAddress remoteServerAddress =
- getInetSocketAddress(entry.getValue().keySet().iterator().next());
- doRequest(remoteServerAddress, writableRequest);
+ getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId());
+ doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+ writableRequest);
}
// Execute the remaining sends mutations (if any)
@@ -385,9 +411,14 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
entry.getKey(), entry.getValue());
+ PartitionOwner partitionOwner =
+ service.getVertexPartitionOwner(
+ entry.getValue().keySet().iterator().next());
InetSocketAddress remoteServerAddress =
- getInetSocketAddress(entry.getValue().keySet().iterator().next());
- doRequest(remoteServerAddress, writableRequest);
+ getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId());
+ doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+ writableRequest);
}
nettyClient.waitAllRequests();
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java Tue Aug 21 22:17:56 2012
@@ -40,7 +40,7 @@ import org.jboss.netty.handler.codec.one
@SuppressWarnings("rawtypes")
public class RequestDecoder<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> extends OneToOneDecoder {
+ M extends Writable> extends OneToOneDecoder {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RequestDecoder.class);
@@ -58,8 +58,9 @@ public class RequestDecoder<I extends Wr
* @param requestRegistry Request registry
* @param byteCounter Keeps track of the decoded bytes
*/
- public RequestDecoder(Configuration conf, RequestRegistry requestRegistry,
- ByteCounter byteCounter) {
+ public RequestDecoder(
+ Configuration conf, RequestRegistry requestRegistry,
+ ByteCounter byteCounter) {
this.conf = conf;
this.requestRegistry = requestRegistry;
this.byteCounter = byteCounter;
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java Tue Aug 21 22:17:56 2012
@@ -20,16 +20,26 @@ package org.apache.giraph.comm;
import java.net.InetSocketAddress;
import java.util.Date;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.jboss.netty.channel.ChannelFuture;
/**
* Help track requests throughout the system
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
-public class RequestInfo {
+public class RequestInfo<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
/** Destination of the request */
private final InetSocketAddress destinationAddress;
/** When the request was started */
private final long startedMsecs;
+ /** Request */
+ private final WritableRequest<I, V, E, M> request;
/** Future of the write of this request*/
private volatile ChannelFuture writeFuture;
@@ -37,13 +47,16 @@ public class RequestInfo {
* Constructor.
*
* @param destinationAddress Destination of the request
+ * @param request Request that is sent
*/
- public RequestInfo(InetSocketAddress destinationAddress) {
+ public RequestInfo(InetSocketAddress destinationAddress,
+ WritableRequest<I, V, E, M> request) {
this.destinationAddress = destinationAddress;
+ this.request = request;
this.startedMsecs = System.currentTimeMillis();
}
- public InetSocketAddress getAddress() {
+ public InetSocketAddress getDestinationAddress() {
return destinationAddress;
}
@@ -60,6 +73,10 @@ public class RequestInfo {
return System.currentTimeMillis() - startedMsecs;
}
+ public WritableRequest<I, V, E, M> getRequest() {
+ return request;
+ }
+
public void setWriteFuture(ChannelFuture writeFuture) {
this.writeFuture = writeFuture;
}
@@ -72,7 +89,7 @@ public class RequestInfo {
public String toString() {
return "(destAddr=" + destinationAddress +
",startDate=" + new Date(startedMsecs) + ",elapsedMsecs=" +
- getElapsedMsecs() +
+ getElapsedMsecs() + ",reqId=" + request.getRequestId() +
((writeFuture == null) ? ")" :
",writeDone=" + writeFuture.isDone() +
",writeSuccess=" + writeFuture.isSuccess() + ")");
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java Tue Aug 21 22:17:56 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.comm;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -41,19 +43,39 @@ import org.jboss.netty.channel.SimpleCha
public class RequestServerHandler<I extends WritableComparable,
V extends Writable, E extends Writable,
M extends Writable> extends SimpleChannelUpstreamHandler {
+ /** Number of bytes in the encoded response */
+ public static final int RESPONSE_BYTES = 13;
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RequestServerHandler.class);
+ /** Already closed first request? */
+ private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+ /** Close connection on first request (used for simulating failure) */
+ private final boolean closeFirstRequest;
/** Data that can be accessed for handling requests */
private final ServerData<I, V, E, M> serverData;
+ /** Request reserved map (for exactly one semantics) */
+ private final WorkerRequestReservedMap workerRequestReservedMap;
+ /** My worker id */
+ private final int myWorkerId;
/**
* Constructor with external server data
*
* @param serverData Data held by the server
+ * @param workerRequestReservedMap Worker request reservation map
+ * @param conf Configuration
*/
- public RequestServerHandler(ServerData<I, V, E, M> serverData) {
+ public RequestServerHandler(
+ ServerData<I, V, E, M> serverData,
+ WorkerRequestReservedMap workerRequestReservedMap,
+ Configuration conf) {
this.serverData = serverData;
+ this.workerRequestReservedMap = workerRequestReservedMap;
+ closeFirstRequest = conf.getBoolean(
+ GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+ GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ myWorkerId = conf.getInt("mapred.task.partition", -1);
}
@Override
@@ -62,15 +84,41 @@ public class RequestServerHandler<I exte
if (LOG.isDebugEnabled()) {
LOG.debug("messageReceived: Got " + e.getMessage().getClass());
}
+
@SuppressWarnings("unchecked")
WritableRequest<I, V, E, M> writableRequest =
(WritableRequest<I, V, E, M>) e.getMessage();
- writableRequest.doRequest(serverData);
- // Send the success response with the request id
- ChannelBuffer buffer = ChannelBuffers.directBuffer(9);
+ // Simulate a closed connection on the first request (if desired)
+ if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
+ LOG.info("messageReceived: Simulating closing channel on first " +
+ "request " + writableRequest.getRequestId() + " from " +
+ writableRequest.getClientId());
+ ALREADY_CLOSED_FIRST_REQUEST = true;
+ ctx.getChannel().close();
+ return;
+ }
+
+ // Only execute this request exactly once
+ int alreadyDone = 1;
+ if (workerRequestReservedMap.reserveRequest(
+ writableRequest.getClientId(),
+ writableRequest.getRequestId())) {
+ writableRequest.doRequest(serverData);
+ alreadyDone = 0;
+ } else {
+ LOG.info("messageReceived: Request id " +
+ writableRequest.getRequestId() + " from client " +
+ writableRequest.getClientId() +
+ " was already processed, " +
+ "not processing again.");
+ }
+
+ // Send the response with the request id
+ ChannelBuffer buffer = ChannelBuffers.directBuffer(RESPONSE_BYTES);
+ buffer.writeInt(myWorkerId);
buffer.writeLong(writableRequest.getRequestId());
- buffer.writeByte(0);
+ buffer.writeByte(alreadyDone);
e.getChannel().write(buffer);
}
@@ -88,13 +136,14 @@ public class RequestServerHandler<I exte
ChannelStateEvent e) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("channelClosed: Closed the channel on " +
- ctx.getChannel().getRemoteAddress());
+ ctx.getChannel().getRemoteAddress() + " with event " +
+ e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- throw new IllegalStateException("exceptionCaught: Channel failed with " +
+ LOG.warn("exceptionCaught: Channel failed with " +
"remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java Tue Aug 21 22:17:56 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -37,17 +39,29 @@ public class ResponseClientHandler exten
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ResponseClientHandler.class);
- /** Outstanding request map */
- private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
+ /** Already dropped first response? (used if dropFirstResponse == true) */
+ private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
+ /** Drop first response (used for simulating failure) */
+ private final boolean dropFirstResponse;
+ /** Outstanding worker request map */
+ private final ConcurrentMap<ClientRequestId, RequestInfo>
+ workerIdOutstandingRequestMap;
/**
* Constructor.
*
- * @param outstandingRequestMap Map of outstanding requests
+ * @param workerIdOutstandingRequestMap Map of worker ids to outstanding
+ * requests
+ * @param conf Configuration
*/
public ResponseClientHandler(
- ConcurrentMap<Long, RequestInfo> outstandingRequestMap) {
- this.outstandingRequestMap = outstandingRequestMap;
+ ConcurrentMap<ClientRequestId, RequestInfo>
+ workerIdOutstandingRequestMap,
+ Configuration conf) {
+ this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
+ dropFirstResponse = conf.getBoolean(
+ GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
+ GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
}
@Override
@@ -57,11 +71,14 @@ public class ResponseClientHandler exten
throw new IllegalStateException("messageReceived: Got a " +
"non-ChannelBuffer message " + event.getMessage());
}
+
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+ int senderId = -1;
long requestId = -1;
int response = -1;
try {
+ senderId = inputStream.readInt();
requestId = inputStream.readLong();
response = inputStream.readByte();
inputStream.close();
@@ -69,12 +86,27 @@ public class ResponseClientHandler exten
throw new IllegalStateException(
"messageReceived: Got IOException ", e);
}
- if (response != 0) {
+
+ // Simulate a failed response on the first response (if desired)
+ if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
+ LOG.info("messageReceived: Simulating dropped response " + response +
+ " for request " + requestId);
+ ALREADY_DROPPED_FIRST_RESPONSE = true;
+ synchronized (workerIdOutstandingRequestMap) {
+ workerIdOutstandingRequestMap.notifyAll();
+ }
+ return;
+ }
+
+ if (response == 1) {
+ LOG.info("messageReceived: Already completed request " + requestId);
+ } else if (response != 0) {
throw new IllegalStateException(
"messageReceived: Got illegal response " + response);
}
- RequestInfo requestInfo = outstandingRequestMap.remove(requestId);
+ RequestInfo requestInfo = workerIdOutstandingRequestMap.remove(
+ new ClientRequestId(senderId, requestId));
if (requestInfo == null) {
throw new IllegalStateException("messageReceived: Impossible to " +
"have a non-registered requestId " + requestId);
@@ -82,14 +114,14 @@ public class ResponseClientHandler exten
if (LOG.isDebugEnabled()) {
LOG.debug("messageReceived: Processed request id = " + requestId +
" " + requestInfo + ". Waiting on " +
- outstandingRequestMap.size() +
+ workerIdOutstandingRequestMap.size() +
" requests, bytes = " + buffer.capacity());
}
}
// Help NettyClient#waitSomeRequests() to finish faster
- synchronized (outstandingRequestMap) {
- outstandingRequestMap.notifyAll();
+ synchronized (workerIdOutstandingRequestMap) {
+ workerIdOutstandingRequestMap.notifyAll();
}
}
@@ -104,7 +136,7 @@ public class ResponseClientHandler exten
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- throw new IllegalStateException("exceptionCaught: Channel failed with " +
+ LOG.warn("exceptionCaught: Channel failed with " +
"remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
}
}
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides a thread-safe map for checking worker and request id pairs
+ */
+public class WorkerRequestReservedMap {
+ /** Map of the worker ids to the requests received (bit set) */
+ private final ConcurrentMap<Integer, IncreasingBitSet>
+ workerRequestReservedMap;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public WorkerRequestReservedMap(Configuration conf) {
+ workerRequestReservedMap = new MapMaker().concurrencyLevel(
+ conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
+ }
+
+ /**
+ * Reserve the request (before the request starts to insure that it is
+ * only executed once). We are assuming no failure on the server.
+ *
+ * @param workerId workerId of the request
+ * @param requestId Request id
+ * @return True if the reserving succeeded, false otherwise
+ */
+ public boolean reserveRequest(Integer workerId, long requestId) {
+ IncreasingBitSet requestSet = getRequestSet(workerId);
+ return requestSet.add(requestId);
+ }
+
+ /**
+ * Get and create the entry as necessary to get the request bit set.
+ *
+ * @param workerId Id of the worker to get the bit set for
+ * @return Bit set for the worker
+ */
+ private IncreasingBitSet getRequestSet(Integer workerId) {
+ IncreasingBitSet requestSet = workerRequestReservedMap.get(workerId);
+ if (requestSet == null) {
+ requestSet = new IncreasingBitSet();
+ IncreasingBitSet previous =
+ workerRequestReservedMap.putIfAbsent(workerId, requestSet);
+ if (previous != null) {
+ requestSet = previous;
+ }
+ }
+ return requestSet;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Tue Aug 21 22:17:56 2012
@@ -41,9 +41,19 @@ public abstract class WritableRequest<I
M extends Writable> implements Writable, Configurable {
/** Configuration */
private Configuration conf;
+ /** Client id */
+ private int clientId = -1;
/** Request id */
private long requestId = -1;
+ public int getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(int clientId) {
+ this.clientId = clientId;
+ }
+
public long getRequestId() {
return requestId;
}
@@ -92,12 +102,14 @@ public abstract class WritableRequest<I
@Override
public final void readFields(DataInput input) throws IOException {
+ clientId = input.readInt();
requestId = input.readLong();
readFieldsRequest(input);
}
@Override
public final void write(DataOutput output) throws IOException {
+ output.writeInt(clientId);
output.writeLong(requestId);
writeRequest(output);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Aug 21 22:17:56 2012
@@ -1356,9 +1356,9 @@ public class BspServiceMaster<I extends
break;
}
- // Wait for a signal or no more than 60 seconds to progress
+ // Wait for a signal or no more than 30 seconds to progress
// or else will continue.
- event.waitMsecs(60 * 1000);
+ event.waitMsecs(30 * 1000);
event.reset();
getContext().progress();
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Aug 21 22:17:56 2012
@@ -985,7 +985,7 @@ public class BspServiceWorker<I extends
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Superstep " + getSuperstep() +
- " , mesages = " + workerSentMessages + " " +
+ ", mesages = " + workerSentMessages + " " +
MemoryUtils.getRuntimeMemoryStats());
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug 21 22:17:56 2012
@@ -159,6 +159,52 @@ public class GiraphJob {
/** Default is to use RPC, not netty */
public static final boolean USE_NETTY_DEFAULT = false;
+ /** TCP backlog (defaults to number of workers) */
+ public static final String TCP_BACKLOG = "giraph.tcpBacklog";
+ /**
+ * Default TCP backlog default if the number of workers is not specified
+ * (i.e unittests)
+ */
+ public static final int TCP_BACKLOG_DEFAULT = 1;
+
+ /** Netty simulate a first request closed */
+ public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+ "giraph.nettySimulateFirstRequestClosed";
+ /** Default of not simulating failure for first request */
+ public static final boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT =
+ false;
+
+ /** Netty simulate a first response failed */
+ public static final String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+ "giraph.nettySimulateFirstResponseFailed";
+ /** Default of not simulating failure for first reponse */
+ public static final boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT =
+ false;
+
+ /** Maximum number of reconnection attempts */
+ public static final String MAX_RECONNECT_ATTEMPTS =
+ "giraph.maxNumberOfOpenRequests";
+ /** Default maximum number of reconnection attempts */
+ public static final int MAX_RECONNECT_ATTEMPTS_DEFAULT = 10;
+
+ /** Max resolve address attempts */
+ public static final String MAX_RESOLVE_ADDRESS_ATTEMPTS =
+ "giraph.maxResolveAddressAttempts";
+ /** Default max resolve address attempts */
+ public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+
+ /** Msecs to wait between waiting for all requests to finish */
+ public static final String WAITING_REQUEST_MSECS =
+ "giraph.waitingRequestMsecs";
+ /** Default msecs to wait between waiting for all requests to finish */
+ public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+
+ /** Milliseconds for a request to complete (or else resend) */
+ public static final String MAX_REQUEST_MILLISECONDS =
+ "giraph.maxRequestMilliseconds";
+ /** Maximum number of milliseconds for a request to complete */
+ public static final int MAX_REQUEST_MILLISECONDS_DEFAULT = 60 * 1000;
+
/** Netty max connection failures */
public static final String NETTY_MAX_CONNECTION_FAILURES =
"giraph.nettyMaxConnectionFailures";
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Tue Aug 21 22:17:56 2012
@@ -70,7 +70,12 @@ public class WorkerInfo implements Writa
return hostnameId;
}
- public InetSocketAddress getHostnamePort() {
+ /**
+ * Get a new instance of the InetSocketAddress for this hostname and port
+ *
+ * @return InetSocketAddress of the hostname and port.
+ */
+ public InetSocketAddress getInetSocketAddress() {
return new InetSocketAddress(hostname, port);
}
Added: giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test IncreasingBitSetTest
+ */
+public class IncreasingBitSetTest {
+ @Test
+ public void add256kIntegers() {
+ IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+ for (int i = 0; i < 256 * 1024; ++i) {
+ assertFalse(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.add(i));
+ assertTrue(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.size() <=
+ IncreasingBitSet.MIN_BITS_TO_SHIFT);
+ }
+ assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+ }
+
+ @Test
+ public void add256kIntegersAlternate() {
+ IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+ for (int i = 0; i < 256 * 1024; i += 2) {
+ assertFalse(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.add(i));
+ assertTrue(IncreasingBitSet.has(i));
+ assertFalse(IncreasingBitSet.has(i + 1));
+ assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+ }
+ assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+ for (int i = 1; i < 256 * 1024; i += 2) {
+ assertFalse(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.add(i));
+ assertTrue(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.has(i - 1));
+ assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+ }
+ assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+ }
+
+ @Test
+ public void add256kIntegersOutOfOrder() {
+ IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+ for (int i = 128 * 1024; i < 256 * 1024; ++i) {
+ assertFalse(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.add(i));
+ assertTrue(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+ }
+ assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+ for (int i = 0; i < 128 * 1024; ++i) {
+ assertFalse(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.add(i));
+ assertTrue(IncreasingBitSet.has(i));
+ assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+ }
+ assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+ assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+ }
+}
Copied: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (from r1375717, giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?p2=giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java&p1=giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java&r1=1375717&r2=1375824&rev=1375824&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Aug 21 22:17:56 2012
@@ -18,12 +18,17 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
@@ -34,25 +39,13 @@ import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
- * Test all the different netty requests.
+ * Test all the netty failure scenarios
*/
-public class RequestTest {
+public class RequestFailureTest {
/** Configuration */
private Configuration conf;
/** Server data */
@@ -64,6 +57,8 @@ public class RequestTest {
/** Client */
private NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
client;
+ /** Mock context */
+ private Context context;
/**
* Only for testing.
@@ -89,69 +84,12 @@ public class RequestTest {
conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
IntWritable.class, Writable.class);
- @SuppressWarnings("rawtypes")
- Context context = mock(Context.class);
+ context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
-
- // Start the service
- serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
- server =
- new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
- conf, serverData);
- server.start();
- client =
- new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
- (context);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
- }
-
- @Test
- public void sendVertexPartition() throws IOException {
- // Data to send
- int partitionId = 13;
- Collection<Vertex<IntWritable, IntWritable, IntWritable,
- IntWritable>> vertices =
- new ArrayList<Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable>>();
- for (int i = 0; i < 10; ++i) {
- TestVertex vertex = new TestVertex();
- vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
- vertices.add(vertex);
- }
-
- // Send the request
- SendVertexRequest<IntWritable, IntWritable, IntWritable,
- IntWritable> request =
- new SendVertexRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partitionId, vertices);
- client.sendWritableRequest(server.getMyAddress(), request);
- client.waitAllRequests();
-
- // Stop the service
- client.stop();
- server.stop();
-
- // Check the output
- Map<Integer, Collection<Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable>>> partitionVertexMap =
- serverData.getPartitionVertexMap();
- synchronized (partitionVertexMap) {
- assertTrue(partitionVertexMap.containsKey(partitionId));
- int total = 0;
- for (Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable> vertex :
- (partitionVertexMap.get(partitionId))) {
- total += vertex.getId().get();
- }
- assertEquals(total, 45);
- }
}
- @Test
- public void sendPartitionMessagesRequest() throws IOException {
+ private WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> getRequest() {
// Data to send
int partitionId = 17;
Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
@@ -167,16 +105,13 @@ public class RequestTest {
// Send the request
SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
- IntWritable> request =
- new SendPartitionMessagesRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partitionId, vertexIdMessages);
- client.sendWritableRequest(server.getMyAddress(), request);
- client.waitAllRequests();
-
- // Stop the service
- client.stop();
- server.stop();
+ IntWritable> request =
+ new SendPartitionMessagesRequest<IntWritable, IntWritable,
+ IntWritable, IntWritable>(partitionId, vertexIdMessages);
+ return request;
+ }
+ private void checkResult(int numRequests) throws IOException {
// Check the output
Iterable<IntWritable> vertices =
serverData.getIncomingMessageStore().getDestinationVertices();
@@ -193,78 +128,119 @@ public class RequestTest {
}
}
assertEquals(21, keySum);
- assertEquals(35, messageSum);
+ assertEquals(35 * numRequests, messageSum);
}
@Test
- public void sendPartitionMutationsRequest() throws IOException {
- // Data to send
- int partitionId = 19;
- Map<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> vertexIdMutations =
- Maps.newHashMap();
- for (int i = 0; i < 11; ++i) {
- VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
- mutations =
- new VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>();
- for (int j = 0; j < 3; ++j) {
- TestVertex vertex = new TestVertex();
- vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
- mutations.addVertex(vertex);
- }
- for (int j = 0; j < 2; ++j) {
- mutations.removeVertex();
- }
- for (int j = 0; j < 5; ++j) {
- Edge<IntWritable, IntWritable> edge =
- new Edge<IntWritable, IntWritable>(
- new IntWritable(i), new IntWritable(2*j));
- mutations.addEdge(edge);
- }
- for (int j = 0; j < 7; ++j) {
- mutations.removeEdge(new IntWritable(j));
- }
- vertexIdMutations.put(new IntWritable(i), mutations);
- }
+ public void send2Requests() throws IOException {
+ // Start the service
+ serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ (SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+ client =
+ new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+ (context);
+ client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
- // Send the request
- SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
- IntWritable> request =
- new SendPartitionMutationsRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partitionId, vertexIdMutations);
- client.sendWritableRequest(server.getMyAddress(), request);
+ // Send the request 2x
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request1 = getRequest();
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request2 = getRequest();
+ client.sendWritableRequest(-1, server.getMyAddress(), request1);
+ client.sendWritableRequest(-1, server.getMyAddress(), request2);
client.waitAllRequests();
// Stop the service
client.stop();
server.stop();
- // Check the output
- ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> inVertexIdMutations =
- serverData.getVertexMutations();
- int keySum = 0;
- for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> entry :
- inVertexIdMutations.entrySet()) {
- synchronized (entry.getValue()) {
- keySum += entry.getKey().get();
- int vertexValueSum = 0;
- for (Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
- vertex : entry.getValue().getAddedVertexList()) {
- vertexValueSum += vertex.getValue().get();
- }
- assertEquals(3, vertexValueSum);
- assertEquals(2, entry.getValue().getRemovedVertexCount());
- int removeEdgeValueSum = 0;
- for (Edge<IntWritable, IntWritable> edge :
- entry.getValue().getAddedEdgeList()) {
- removeEdgeValueSum += edge.getValue().get();
- }
- assertEquals(20, removeEdgeValueSum);
- }
- }
- assertEquals(55, keySum);
+ // Check the output (should have been only processed once)
+ checkResult(2);
+ }
+
+ @Test
+ public void alreadyProcessedRequest() throws IOException {
+ // Force a drop of the first request
+ conf.setBoolean(GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED, true);
+ // One second to finish a request
+ conf.setInt(GiraphJob.MAX_REQUEST_MILLISECONDS, 1000);
+ // Loop every 2 seconds
+ conf.setInt(GiraphJob.WAITING_REQUEST_MSECS, 2000);
+
+ // Start the service
+ serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ (SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+ client =
+ new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+ (context);
+ client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+
+ // Send the request 2x, but should only be processed once
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request1 = getRequest();
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request2 = getRequest();
+ client.sendWritableRequest(-1, server.getMyAddress(), request1);
+ client.sendWritableRequest(-1, server.getMyAddress(), request2);
+ client.waitAllRequests();
+
+ // Stop the service
+ client.stop();
+ server.stop();
+
+ // Check the output (should have been only processed once)
+ checkResult(2);
+ }
+
+ @Test
+ public void resendRequest() throws IOException {
+ // Force a drop of the first request
+ conf.setBoolean(GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED, true);
+ // One second to finish a request
+ conf.setInt(GiraphJob.MAX_REQUEST_MILLISECONDS, 1000);
+ // Loop every 2 seconds
+ conf.setInt(GiraphJob.WAITING_REQUEST_MSECS, 2000);
+
+ // Start the service
+ serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ (SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+ client =
+ new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+ (context);
+ client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+
+ // Send the request 2x, but should only be processed once
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request1 = getRequest();
+ WritableRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request2 = getRequest();
+ client.sendWritableRequest(-1, server.getMyAddress(), request1);
+ client.sendWritableRequest(-1, server.getMyAddress(), request2);
+ client.waitAllRequests();
+
+ // Stop the service
+ client.stop();
+ server.stop();
+
+ // Check the output (should have been only processed once)
+ checkResult(2);
}
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug 21 22:17:56 2012
@@ -127,7 +127,7 @@ public class RequestTest {
IntWritable> request =
new SendVertexRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertices);
- client.sendWritableRequest(server.getMyAddress(), request);
+ client.sendWritableRequest(-1, server.getMyAddress(), request);
client.waitAllRequests();
// Stop the service
@@ -170,7 +170,7 @@ public class RequestTest {
IntWritable> request =
new SendPartitionMessagesRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertexIdMessages);
- client.sendWritableRequest(server.getMyAddress(), request);
+ client.sendWritableRequest(-1, server.getMyAddress(), request);
client.waitAllRequests();
// Stop the service
@@ -233,7 +233,7 @@ public class RequestTest {
IntWritable> request =
new SendPartitionMutationsRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertexIdMutations);
- client.sendWritableRequest(server.getMyAddress(), request);
+ client.sendWritableRequest(-1, server.getMyAddress(), request);
client.waitAllRequests();
// Stop the service