You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by he...@apache.org on 2017/03/27 18:22:45 UTC

git commit: updated refs/heads/trunk to 2173d87

Repository: giraph
Updated Branches:
  refs/heads/trunk a1d546f7a -> 2173d87cf


JIRA-1137

closes #26


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2173d87c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2173d87c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2173d87c

Branch: refs/heads/trunk
Commit: 2173d87cf4aa84937abe438d0f836c8fdd3ba1c1
Parents: a1d546f
Author: Hassan Eslami <he...@apache.org>
Authored: Mon Mar 27 13:22:09 2017 -0500
Committer: Hassan Eslami <ha...@wirelessprv-10-193-225-240.near.illinois.edu>
Committed: Mon Mar 27 13:22:09 2017 -0500

----------------------------------------------------------------------
 .../flow_control/CreditBasedFlowControl.java    | 68 +++++++++++++++-----
 .../ooc/policy/MemoryEstimatorOracle.java       | 11 ++--
 .../giraph/ooc/policy/ThresholdBasedOracle.java | 11 ++--
 .../org/apache/giraph/utils/ThreadUtils.java    | 18 ++++++
 4 files changed, 79 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index 18cf017..1e06925 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -23,6 +23,7 @@ import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.giraph.comm.netty.NettyClient;
@@ -32,6 +33,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayDeque;
@@ -41,6 +43,8 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -166,6 +170,14 @@ public class CreditBasedFlowControl implements FlowControl {
   private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
       Maps.newConcurrentMap();
   /**
+   * Queue of the cached requests to be sent out. The queue keeps pairs of
+   * (destination id, request). The thread-safe blocking queue is used here for
+   * the sake of simplicity. The blocking queue should be bounded (with bounds
+   * no more than user-defined max number of unsent/cached requests) to avoid
+   * excessive memory footprint.
+   */
+  private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
+  /**
    * Semaphore to control number of cached unsent requests. Maximum number of
    * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
    */
@@ -180,7 +192,7 @@ public class CreditBasedFlowControl implements FlowControl {
    * @param exceptionHandler Exception handler
    */
   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
-                                NettyClient nettyClient,
+                                final NettyClient nettyClient,
                                 Thread.UncaughtExceptionHandler
                                     exceptionHandler) {
     this.nettyClient = nettyClient;
@@ -189,10 +201,15 @@ public class CreditBasedFlowControl implements FlowControl {
     checkState(maxOpenRequestsPerWorker < 0x4000 &&
         maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
         "requests should be in range (0, " + 0x4FFF + ")");
-    unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
+    int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
+    unsentRequestPermit = new Semaphore(maxUnsentRequests);
+    this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
+        maxUnsentRequests);
     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
-    Thread thread = new Thread(new Runnable() {
+
+    // Thread to handle/send resume signals when necessary
+    ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
         while (true) {
@@ -214,11 +231,31 @@ public class CreditBasedFlowControl implements FlowControl {
           }
         }
       }
-    });
-    thread.setUncaughtExceptionHandler(exceptionHandler);
-    thread.setName("resume-sender");
-    thread.setDaemon(true);
-    thread.start();
+    }, "resume-sender", exceptionHandler);
+
+    // Thread to handle/send cached requests
+    ThreadUtils.startThread(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          Pair<Integer, WritableRequest> pair = null;
+          try {
+            pair = toBeSent.take();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException("run: failed while waiting to " +
+                "take an element from the request queue!", e);
+          }
+          int taskId = pair.getLeft();
+          WritableRequest request = pair.getRight();
+          nettyClient.doSend(taskId, request);
+          if (aggregateUnsentRequests.decrementAndGet() == 0) {
+            synchronized (aggregateUnsentRequests) {
+              aggregateUnsentRequests.notifyAll();
+            }
+          }
+        }
+      }
+    }, "cached-req-sender", exceptionHandler);
   }
 
   /**
@@ -510,13 +547,14 @@ public class CreditBasedFlowControl implements FlowControl {
       }
       unsentRequestPermit.release();
       // At this point, we have a request, and we reserved a credit for the
-      // sender client. So, we send the request to the client and update
-      // the state.
-      nettyClient.doSend(taskId, request);
-      if (aggregateUnsentRequests.decrementAndGet() == 0) {
-        synchronized (aggregateUnsentRequests) {
-          aggregateUnsentRequests.notifyAll();
-        }
+      // sender client. So, we put the request in a queue to be sent to the
+      // client.
+      try {
+        toBeSent.put(
+            new ImmutablePair<Integer, WritableRequest>(taskId, request));
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("trySendCachedRequests: failed while" +
+            "waiting to put element in send queue!", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
index fd6172c..871ef6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
@@ -29,6 +29,7 @@ import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.ooc.command.IOCommand;
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.worker.EdgeInputSplitsCallable;
 import org.apache.giraph.worker.VertexInputSplitsCallable;
 import org.apache.giraph.worker.WorkerProgress;
@@ -171,7 +172,7 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
 
     final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
 
-    Thread thread = new Thread(new Runnable() {
+    ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
         while (true) {
@@ -211,12 +212,8 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
           }
         }
       }
-    });
-    thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
-      .getGraphTaskManager().createUncaughtExceptionHandler());
-    thread.setName("ooc-memory-checker");
-    thread.setDaemon(true);
-    thread.start();
+    }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
+        .createUncaughtExceptionHandler());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
index 2dd2c10..2f1ba7a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -26,6 +26,7 @@ import org.apache.giraph.conf.LongConfOption;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.ooc.command.IOCommand;
 import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -170,7 +171,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
     this.oocEngine = oocEngine;
     this.lastMajorGCTime = 0;
 
-    final Thread thread = new Thread(new Runnable() {
+    ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
         while (true) {
@@ -207,12 +208,8 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
           }
         }
       }
-    });
-    thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
-        .getGraphTaskManager().createUncaughtExceptionHandler());
-    thread.setName("memory-checker");
-    thread.setDaemon(true);
-    thread.start();
+    }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
+        createUncaughtExceptionHandler());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2173d87c/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index 0971402..21e8e49 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -106,6 +106,24 @@ public class ThreadUtils {
   }
 
   /**
+   * Start thread with specified name, runnable and exception handler, and make
+   * it daemon
+   *
+   * @param runnable Runnable to execute
+   * @param threadName Name of the thread
+   * @param handler Exception handler
+   * @return Thread
+   */
+  public static Thread startThread(Runnable runnable, String threadName,
+                                   Thread.UncaughtExceptionHandler handler) {
+    Thread thread = new Thread(runnable, threadName);
+    thread.setUncaughtExceptionHandler(handler);
+    thread.setDaemon(true);
+    thread.start();
+    return thread;
+  }
+
+  /**
    * Sleep for specified milliseconds, logging and ignoring interrupted
    * exceptions
    *