You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/12/16 20:41:18 UTC
[giraph] branch trunk updated: GIRAPH-1230
This is an automated email from the ASF dual-hosted git repository.
dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new f8d017e GIRAPH-1230
f8d017e is described below
commit f8d017e61d66ec56b17ecf796743d6851c2f0988
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Mon Dec 16 12:40:54 2019 -0800
GIRAPH-1230
closes #118
---
checkstyle.xml | 2 +-
.../apache/giraph/comm/netty/ChannelRotater.java | 9 ++++--
.../org/apache/giraph/comm/netty/NettyClient.java | 37 ++++++++++++----------
.../org/apache/giraph/graph/GraphTaskManager.java | 28 ++++++++++++++++
.../org/apache/giraph/worker/BspServiceWorker.java | 12 ++++++-
5 files changed, 66 insertions(+), 22 deletions(-)
diff --git a/checkstyle.xml b/checkstyle.xml
index e0e604c..f820d74 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -227,7 +227,7 @@
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
- <property name="max" value="200"/>
+ <property name="max" value="210"/>
</module>
<module name="ParameterNumber">
<property name="max" value="8"/>
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
index 54c0b50..53af9c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
@@ -25,12 +25,15 @@ import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.log4j.Logger;
/**
* Maintains multiple channels and rotates between them. This is thread-safe.
*/
public class ChannelRotater {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
/** Index of last used channel */
private int index = 0;
/** Channel list */
@@ -73,9 +76,9 @@ public class ChannelRotater {
*/
public synchronized Channel nextChannel() {
if (channelList.isEmpty()) {
- throw new IllegalArgumentException(
- "nextChannel: No channels exist for hostname " +
- address.getHostName());
+ LOG.warn("nextChannel: No channels exist for hostname " +
+ address.getHostName());
+ return null;
}
++index;
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c8ccea2..103a8ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -242,6 +242,11 @@ public class NettyClient {
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
+ /**
+ * Keeps track of the number of reconnect failures. Once this exceeds the
+ * value of {@link #maxConnectionFailures}, the job will fail.
+ */
+ private int reconnectFailures = 0;
/**
* Only constructor
@@ -764,26 +769,25 @@ public class NettyClient {
private Channel getNextChannel(InetSocketAddress remoteServer) {
Channel channel = addressChannelMap.get(remoteServer).nextChannel();
if (channel == null) {
- throw new IllegalStateException(
- "getNextChannel: No channel exists for " + remoteServer);
- }
-
- // Return this channel if it is connected
- if (channel.isActive()) {
- return channel;
- }
+ LOG.warn("getNextChannel: No channel exists for " + remoteServer);
+ } else {
+ // Return this channel if it is connected
+ if (channel.isActive()) {
+ return channel;
+ }
- // Get rid of the failed channel
- if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
- LOG.warn("getNextChannel: Unlikely event that the channel " +
+ // Get rid of the failed channel
+ if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
+ LOG.warn("getNextChannel: Unlikely event that the channel " +
channel + " was already removed!");
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("getNextChannel: Fixing disconnected channel to " +
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getNextChannel: Fixing disconnected channel to " +
remoteServer + ", open = " + channel.isOpen() + ", " +
"bound = " + channel.isRegistered());
+ }
}
- int reconnectFailures = 0;
+
while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
try {
@@ -1205,7 +1209,7 @@ public class NettyClient {
* This listener class just dumps exception stack traces if
* something happens.
*/
- private class LogOnErrorChannelFutureListener
+ private static class LogOnErrorChannelFutureListener
implements ChannelFutureListener {
@Override
@@ -1213,7 +1217,6 @@ public class NettyClient {
if (future.isDone() && !future.isSuccess()) {
LOG.error("Channel failed channelId=" + future.channel().hashCode(),
future.cause());
- checkRequestsAfterChannelFailure(future.channel());
}
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 54d3084..6db1934 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -1075,6 +1075,18 @@ end[PURE_YARN]*/
getConf()), getJobProgressTracker());
}
+ /**
+ * Creates exception handler with the passed implementation of
+ * {@link CheckerIfWorkerShouldFailAfterException}.
+ *
+ * @param checker Instance that checks whether the job should fail.
+ * @return Exception handler.
+ */
+ public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
+ CheckerIfWorkerShouldFailAfterException checker) {
+ return new OverrideExceptionHandler(checker, getJobProgressTracker());
+ }
+
public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
return conf;
}
@@ -1128,6 +1140,9 @@ end[PURE_YARN]*/
@Override
public void uncaughtException(final Thread t, final Throwable e) {
if (!checker.checkIfWorkerShouldFail(t, e)) {
+ LOG.error(
+ "uncaughtException: OverrideExceptionHandler on thread " +
+ t.getName() + ", msg = " + e.getMessage(), e);
return;
}
try {
@@ -1169,4 +1184,17 @@ end[PURE_YARN]*/
return true;
}
}
+
+ /**
+ * Checks the message of a throwable, and checks whether it is a
+ * "connection reset by peer" type of exception.
+ *
+ * @param throwable Throwable
+ * @return True if the throwable is a "connection reset by peer",
+ * false otherwise.
+ */
+ public static boolean isConnectionResetByPeer(Throwable throwable) {
+ return throwable.getMessage().startsWith(
+ "Connection reset by peer") ? true : false;
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b6756c9..a745b1e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -121,6 +121,8 @@ import org.json.JSONObject;
import com.google.common.collect.Lists;
+import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;
+
/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
*
@@ -217,7 +219,15 @@ public class BspServiceWorker<I extends WritableComparable,
getGraphPartitionerFactory().createWorkerGraphPartitioner();
workerInfo = new WorkerInfo();
workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
- graphTaskManager.createUncaughtExceptionHandler());
+ graphTaskManager.createUncaughtExceptionHandler(
+ (thread, throwable) -> {
+ // If the connection was closed by the client, then we just log
+ // the error, we do not fail the job, since the client will
+ // attempt to reconnect.
+ return !isConnectionResetByPeer(throwable);
+ }
+ )
+ );
workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
workerServer.getLocalHostOrIp());
workerInfo.setTaskId(getTaskId());