You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/08/25 22:22:38 UTC

git commit: updated refs/heads/trunk to a15675c

Repository: giraph
Updated Branches:
  refs/heads/trunk 693a71c85 -> a15675cae


GIRAPH-1103: Another try to fix jobs getting stuck after channel failure

Summary:
With GIRAPH-1087 we see jobs stuck after channel failure less often, but it still happens. There are several additional issues I found: requests failing to send at the first place so they never get retried, callbacks for channel failures not being triggered always.
Added a thread which will periodically check on open requests even when we are not waiting on all open requests (since in many places we don't), remove the check that request wass ent when retrying it, added some thread utils while at it.

Test Plan: Before the change, failure rate of a particular job was about 1 in 50. Had over 200 successful runs with this change.

Differential Revision: https://reviews.facebook.net/D61719


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a15675ca
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a15675ca
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a15675ca

Branch: refs/heads/trunk
Commit: a15675cae4c91d0fe0496a879de4242ffbc5f9ce
Parents: 693a71c
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Aug 8 11:13:35 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Aug 25 15:22:17 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 80 +++++++++++---------
 .../job/DefaultJobProgressTrackerService.java   | 28 ++-----
 .../apache/giraph/utils/JMapHistoDumper.java    | 12 +--
 .../giraph/utils/ReactiveJMapHistoDumper.java   | 23 ++----
 .../org/apache/giraph/utils/ThreadUtils.java    | 37 +++++++++
 .../apache/giraph/worker/MemoryObserver.java    | 13 +---
 .../giraph/worker/WorkerProgressWriter.java     | 21 ++---
 .../giraph/zk/InProcessZooKeeperRunner.java     |  7 +-
 8 files changed, 115 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 785d906..541ce93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -385,6 +385,18 @@ public class NettyClient {
             checkRequestsAfterChannelFailure(ctx.channel());
           }
         });
+
+    // Start a thread which will observe if there are any problems with open
+    // requests
+    ThreadUtils.startThread(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          ThreadUtils.trySleep(waitingRequestMsecs);
+          checkRequestsForProblems();
+        }
+      }
+    }, "open-requests-observer");
   }
 
   /**
@@ -715,11 +727,7 @@ public class NettyClient {
           " on attempt " + reconnectFailures + " out of " +
           maxConnectionFailures + " max attempts, sleeping for 5 secs",
           connectionFuture.cause());
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        LOG.warn("getNextChannel: Unexpected interrupted exception", e);
-      }
+      ThreadUtils.trySleep(5000);
     }
     throw new IllegalStateException("getNextChannel: Failed to connect " +
         "to " + remoteServer + " in " + reconnectFailures +
@@ -758,7 +766,6 @@ public class NettyClient {
     }
 /*end[HADOOP_NON_SECURE]*/
 
-    Channel channel = getNextChannel(remoteServer);
     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
     if (registerRequest) {
       request.setClientId(myTaskInfo.getTaskId());
@@ -780,13 +787,23 @@ public class NettyClient {
         ", size " + request.getSerializedSize() +
         " bytes. Check netty buffer size.");
     }
-    ChannelFuture writeFuture = channel.write(request);
-    newRequestInfo.setWriteFuture(writeFuture);
-    writeFuture.addListener(logErrorListener);
+    writeRequestToChannel(newRequestInfo);
     return requestId;
   }
 
   /**
+   * Write request to a channel for its destination
+   *
+   * @param requestInfo Request info
+   */
+  private void writeRequestToChannel(RequestInfo requestInfo) {
+    Channel channel = getNextChannel(requestInfo.getDestinationAddress());
+    ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+    requestInfo.setWriteFuture(writeFuture);
+    writeFuture.addListener(logErrorListener);
+  }
+
+  /**
    * Handle receipt of a message. Called by response handler.
    *
    * @param senderId Id of sender of the message
@@ -866,7 +883,6 @@ public class NettyClient {
     logInfoAboutOpenRequests();
     // Make sure that waiting doesn't kill the job
     context.progress();
-    checkRequestsForProblems();
   }
 
   /**
@@ -946,9 +962,9 @@ public class NettyClient {
         ChannelFuture writeFuture = requestInfo.getWriteFuture();
         // If not connected anymore, request failed, or the request is taking
         // too long, re-establish and resend
-        return !writeFuture.channel().isActive() ||
-          (writeFuture.isDone() && !writeFuture.isSuccess()) ||
-          (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+        return (writeFuture != null && (!writeFuture.channel().isActive() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
+            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
       }
     });
   }
@@ -969,21 +985,23 @@ public class NettyClient {
     for (Map.Entry<ClientRequestId, RequestInfo> entry :
         clientRequestIdRequestInfoMap.entrySet()) {
       RequestInfo requestInfo = entry.getValue();
-      ChannelFuture writeFuture = requestInfo.getWriteFuture();
-      // Request wasn't sent yet
-      if (writeFuture == null) {
-        continue;
-      }
       // If request should be resent
       if (shouldResendRequestPredicate.apply(requestInfo)) {
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        String logMessage;
+        if (writeFuture == null) {
+          logMessage = "wasn't sent successfully";
+        } else {
+          logMessage = "connected = " +
+              writeFuture.channel().isActive() +
+              ", future done = " + writeFuture.isDone() + ", " +
+              "success = " + writeFuture.isSuccess() + ", " +
+              "cause = " + writeFuture.cause();
+        }
         LOG.warn("checkRequestsForProblems: Problem with request id " +
-            entry.getKey() + " connected = " +
-            writeFuture.channel().isActive() +
-            ", future done = " + writeFuture.isDone() + ", " +
-            "success = " + writeFuture.isSuccess() + ", " +
-            "cause = " + writeFuture.cause() + ", " +
+            entry.getKey() + ", " + logMessage + ", " +
             "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
-            "destination = " + writeFuture.channel().remoteAddress() +
+            "destination = " + requestInfo.getDestinationAddress() +
             " " + requestInfo);
         addedRequestIds.add(entry.getKey());
         addedRequestInfos.add(new RequestInfo(
@@ -1001,14 +1019,10 @@ public class NettyClient {
             " completed prior to sending the next request");
         clientRequestIdRequestInfoMap.remove(requestId);
       }
-      InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
-      Channel channel = getNextChannel(remoteServer);
       if (LOG.isInfoEnabled()) {
         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
       }
-      ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
-      requestInfo.setWriteFuture(writeFuture);
-      writeFuture.addListener(logErrorListener);
+      writeRequestToChannel(requestInfo);
     }
     addedRequestIds.clear();
     addedRequestInfos.clear();
@@ -1035,11 +1049,7 @@ public class NettyClient {
       LOG.warn("resolveAddress: Failed to resolve " + address +
           " on attempt " + resolveAttempts + " of " +
           maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        LOG.warn("resolveAddress: Interrupted.", e);
-      }
+      ThreadUtils.trySleep(5000);
       address = new InetSocketAddress(hostOrIp,
           address.getPort());
     }
@@ -1080,7 +1090,7 @@ public class NettyClient {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        return requestInfo.getWriteFuture().channel().remoteAddress().equals(
+        return requestInfo.getDestinationAddress().equals(
             channel.remoteAddress());
       }
     });

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index adca42b..9e836dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -20,6 +20,7 @@ package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
@@ -72,7 +73,7 @@ public class DefaultJobProgressTrackerService
    * Start the thread which writes progress periodically
    */
   private void startWriterThread() {
-    writerThread = new Thread(new Runnable() {
+    writerThread = ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
         while (!finished) {
@@ -89,19 +90,12 @@ public class DefaultJobProgressTrackerService
               break;
             }
           }
-          try {
-            Thread.sleep(UPDATE_MILLISECONDS);
-          } catch (InterruptedException e) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Progress thread interrupted");
-            }
+          if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
             break;
           }
         }
       }
-    });
-    writerThread.setDaemon(true);
-    writerThread.start();
+    }, "progress-writer");
   }
 
   @Override
@@ -119,11 +113,10 @@ public class DefaultJobProgressTrackerService
         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
     if (maxAllowedJobTimeMs > 0) {
       // Start a thread which will kill the job if running for too long
-      Thread killThread = new Thread(new Runnable() {
+      ThreadUtils.startThread(new Runnable() {
         @Override
         public void run() {
-          try {
-            Thread.sleep(maxAllowedJobTimeMs);
+          if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
             try {
               LOG.warn("Killing job because it took longer than " +
                   maxAllowedJobTimeMs + " milliseconds");
@@ -131,16 +124,9 @@ public class DefaultJobProgressTrackerService
             } catch (IOException e) {
               LOG.warn("Failed to kill job", e);
             }
-          } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Thread checking for jobs max allowed time " +
-                  "interrupted");
-            }
           }
         }
-      });
-      killThread.setDaemon(true);
-      killThread.start();
+      }, "job-runtime-observer");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index a68f6c4..674929a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -82,21 +82,15 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
    */
   public void startJMapThread() {
     stop = false;
-    thread = new Thread(new Runnable() {
+    thread = ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
         while (!stop) {
           JMap.heapHistogramDump(linesToPrint, liveObjectsOnly);
-          try {
-            Thread.sleep(sleepMillis);
-          } catch (InterruptedException e) {
-            LOG.info("JMap histogram sleep interrupted", e);
-          }
+          ThreadUtils.trySleep(sleepMillis);
         }
       }
-    });
-    thread.setDaemon(true);
-    thread.start();
+    }, "jmap-dumper");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
index f3ecfb0..778311c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
@@ -89,26 +89,19 @@ public class ReactiveJMapHistoDumper extends
   public void startSupervisorThread() {
     stop = false;
     final Runtime runtime = Runtime.getRuntime();
-    thread = new Thread(new Runnable() {
+    thread = ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
-        try {
-          while (!stop) {
-            long potentialMemory = (runtime.maxMemory() -
-                runtime.totalMemory()) + runtime.freeMemory();
-            if (potentialMemory / MB < minFreeMemory) {
-              JMap.heapHistogramDump(linesToPrint);
-            }
-            Thread.sleep(sleepMillis);
+        while (!stop) {
+          long potentialMemory = (runtime.maxMemory() -
+              runtime.totalMemory()) + runtime.freeMemory();
+          if (potentialMemory / MB < minFreeMemory) {
+            JMap.heapHistogramDump(linesToPrint);
           }
-        } catch (InterruptedException e) {
-          LOG.warn("JMap histogram sleep interrupted", e);
+          ThreadUtils.trySleep(sleepMillis);
         }
       }
-    });
-    thread.setName("ReactiveJMapHistoDumperSupervisorThread");
-    thread.setDaemon(true);
-    thread.start();
+    }, "ReactiveJMapHistoDumperSupervisorThread");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index 83eca14..0971402 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.utils;
 
+import org.apache.log4j.Logger;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.util.concurrent.Callable;
@@ -28,6 +30,8 @@ import java.util.concurrent.ThreadFactory;
  * Utility class for thread related functions.
  */
 public class ThreadUtils {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(ThreadUtils.class);
 
   /**
    * Utility class. Do not inherit or create objects.
@@ -86,4 +90,37 @@ public class ThreadUtils {
     return executorService.submit(
         new LogStacktraceCallable<>(callable, uncaughtExceptionHandler));
   }
+
+  /**
+   * Start thread with specified name and runnable, and make it daemon
+   *
+   * @param threadName Name of the thread
+   * @param runnable Runnable to execute
+   * @return Thread
+   */
+  public static Thread startThread(Runnable runnable, String threadName) {
+    Thread thread = new Thread(runnable, threadName);
+    thread.setDaemon(true);
+    thread.start();
+    return thread;
+  }
+
+  /**
+   * Sleep for specified milliseconds, logging and ignoring interrupted
+   * exceptions
+   *
+   * @param millis How long to sleep for
+   * @return Whether the sleep was successful or the thread was interrupted
+   */
+  public static boolean trySleep(long millis) {
+    try {
+      Thread.sleep(millis);
+      return true;
+    } catch (InterruptedException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Thread interrupted");
+      }
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java
index 50fae81..3059c4d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.FloatConfOption;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -91,7 +92,7 @@ public class MemoryObserver {
 
     final float freeMemoryFractionForGc =
         FREE_MEMORY_FRACTION_FOR_GC.get(conf);
-    Thread thread = new Thread(new Runnable() {
+    ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
 
@@ -115,18 +116,12 @@ public class MemoryObserver {
               LOG.warn("Exception occurred", e);
             }
           }
-          try {
-            Thread.sleep(MEMORY_OBSERVER_SLEEP_MS);
-          } catch (InterruptedException e) {
-            LOG.warn("Exception occurred", e);
+          if (!ThreadUtils.trySleep(MEMORY_OBSERVER_SLEEP_MS)) {
             return;
           }
         }
       }
-    });
-    thread.setName("memory-observer");
-    thread.setDaemon(true);
-    thread.start();
+    }, "memory-observer");
   }
 
   /** Set watcher on memory observer folder */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index f37a48d..c5dfcab 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.job.JobProgressTracker;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -46,25 +47,19 @@ public class WorkerProgressWriter {
    */
   public WorkerProgressWriter(JobProgressTracker jobProgressTracker) {
     this.jobProgressTracker = jobProgressTracker;
-    writerThread = new Thread(new Runnable() {
+    writerThread = ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
-        try {
-          while (!finished) {
-            updateAndSendProgress();
-            double factor = 1 + Math.random();
-            Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
-          }
-        } catch (InterruptedException e) {
-          // Thread is interrupted when stop is called, we can just log this
-          if (LOG.isInfoEnabled()) {
-            LOG.info("run: WorkerProgressWriter interrupted");
+        while (!finished) {
+          updateAndSendProgress();
+          double factor = 1 + Math.random();
+          if (!ThreadUtils.trySleep(
+              (long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor))) {
+            break;
           }
         }
       }
     }, "workerProgressThread");
-    writerThread.setDaemon(true);
-    writerThread.start();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/a15675ca/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
index cee2c78..9ac6907 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
@@ -19,6 +19,7 @@ package org.apache.giraph.zk;
 
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.server.DatadirCleanupManager;
@@ -147,7 +148,7 @@ public class InProcessZooKeeperRunner
       }
 
       runFromConfig(config);
-      Thread zkThread = new Thread(new Runnable() {
+      ThreadUtils.startThread(new Runnable() {
         @Override
         public void run() {
           try {
@@ -160,9 +161,7 @@ public class InProcessZooKeeperRunner
           }
 
         }
-      });
-      zkThread.setDaemon(true);
-      zkThread.start();
+      }, "zk-thread");
       return zkServer.getClientPort();
     }