You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2017/03/01 22:06:08 UTC

git commit: updated refs/heads/trunk to 18c67ca

Repository: giraph
Updated Branches:
  refs/heads/trunk f37f373a0 -> 18c67ca3c


GIRAPH-1132

closes #21


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/18c67ca3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/18c67ca3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/18c67ca3

Branch: refs/heads/trunk
Commit: 18c67ca3c221cf2961d59ac19a245762f4dd8f7d
Parents: f37f373
Author: Sergey Edunov <ed...@fb.com>
Authored: Wed Mar 1 14:05:54 2017 -0800
Committer: Sergey Edunov <ed...@fb.com>
Committed: Wed Mar 1 14:05:54 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/giraph/bsp/BspService.java  |  3 ++-
 .../org/apache/giraph/conf/GiraphConstants.java | 23 ++++++++++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  |  8 +++++--
 .../apache/giraph/worker/BspServiceWorker.java  | 16 ++++++++++----
 .../java/org/apache/giraph/zk/BspEvent.java     |  5 +++--
 .../org/apache/giraph/zk/PredicateLock.java     |  6 ++++-
 .../org/apache/giraph/zk/TestPredicateLock.java |  6 ++---
 7 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index a2caf81..976997f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -281,7 +281,8 @@ public abstract class BspService<I extends WritableComparable,
                                  conf.getZookeeperOpsRetryWaitMsecs(),
                                  this,
                                  context);
-      connectedEvent.waitForever();
+      connectedEvent.waitForTimeoutOrFail(
+          GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
       this.fs = FileSystem.get(getConfiguration());
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c7b04d8..3a3e8dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.OutputFormat;
 
+import static java.util.concurrent.TimeUnit.HOURS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
@@ -1238,5 +1239,27 @@ public interface GiraphConstants {
   BooleanConfOption PREFER_IP_ADDRESSES =
       new BooleanConfOption("giraph.preferIP", false,
       "Prefer IP addresses instead of host names");
+
+  /**
+   * Timeout for "waitForever", when we need to wait for zookeeper.
+   * Since we should never really have to wait forever.
+   * We should only wait some reasonable but large amount of time.
+   */
+  LongConfOption WAIT_ZOOKEEPER_TIMEOUT_MSEC =
+      new LongConfOption("giraph.waitZookeeperTimeoutMsec",
+          MINUTES.toMillis(15),
+          "How long should we stay in waitForever loops in various " +
+              "places that require network connection");
+
+  /**
+   * Timeout for "waitForever", when we need to wait for other workers
+   * to complete their job.
+   * Since we should never really have to wait forever.
+   * We should only wait some reasonable but large amount of time.
+   */
+  LongConfOption WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC =
+      new LongConfOption("giraph.waitForOtherWorkersMsec",
+          HOURS.toMillis(48),
+          "How long should workers wait to finish superstep");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 971e266..6b64cf5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -863,7 +863,9 @@ public class BspServiceMaster<I extends WritableComparable,
           return isMaster;
         }
         LOG.info("becomeMaster: Waiting to become the master...");
-        getMasterElectionChildrenChangedEvent().waitForever();
+        getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
+            GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+                getConfiguration()));
         getMasterElectionChildrenChangedEvent().reset();
       } catch (KeeperException e) {
         throw new IllegalStateException(
@@ -1832,7 +1834,9 @@ public class BspServiceMaster<I extends WritableComparable,
         return;
       }
 
-      getCleanedUpChildrenChangedEvent().waitForever();
+      getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
+          GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+              getConfiguration()));
       getCleanedUpChildrenChangedEvent().reset();
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b00eb7..b6b9c12 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -448,7 +448,9 @@ public class BspServiceWorker<I extends WritableComparable,
       if (inputSplitsDoneStat != null) {
         break;
       }
-      getInputSplitsAllDoneEvent().waitForever();
+      getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
+          GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+              getConfiguration()));
       getInputSplitsAllDoneEvent().reset();
     }
   }
@@ -647,7 +649,9 @@ else[HADOOP_NON_SECURE]*/
           "from previous failure): " + myHealthPath +
           ".  Waiting for change in attempts " +
           "to re-join the application");
-      getApplicationAttemptChangedEvent().waitForever();
+      getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
+          GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+              getConfiguration()));
       if (LOG.isInfoEnabled()) {
         LOG.info("registerHealth: Got application " +
             "attempt changed event, killing self");
@@ -868,7 +872,9 @@ else[HADOOP_NON_SECURE]*/
   private void waitForOtherWorkers(String superstepFinishedNode) {
     try {
       while (getZkExt().exists(superstepFinishedNode, true) == null) {
-        getSuperstepFinishedEvent().waitForever();
+        getSuperstepFinishedEvent().waitForTimeoutOrFail(
+            GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+                getConfiguration()));
         getSuperstepFinishedEvent().reset();
       }
     } catch (KeeperException e) {
@@ -1683,7 +1689,9 @@ else[HADOOP_NON_SECURE]*/
           LOG.info("exchangeVertexPartitions: Waiting for workers " +
               workerIdSet);
         }
-        getPartitionExchangeChildrenChangedEvent().waitForever();
+        getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
+            GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+                getConfiguration()));
         getPartitionExchangeChildrenChangedEvent().reset();
       }
     } catch (KeeperException | InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
index 7d22f9a..d3b8e30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
@@ -43,7 +43,8 @@ public interface BspEvent {
   boolean waitMsecs(int msecs);
 
   /**
-   * Wait indefinitely until the event occurs.
+   * Waits until timeout or fails with runtime exception.
+   * @param timeout Throws exception if waiting takes longer than timeout.
    */
-  void waitForever();
+  void waitForTimeoutOrFail(long timeout);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
index 4937f82..982c417 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
@@ -134,8 +134,12 @@ public class PredicateLock implements BspEvent {
   }
 
   @Override
-  public void waitForever() {
+  public void waitForTimeoutOrFail(long timeout) {
+    long t0 = System.currentTimeMillis();
     while (!waitMsecs(msecPeriod)) {
+      if (System.currentTimeMillis() > t0 + timeout) {
+        throw new RuntimeException("Timeout waiting");
+      }
       progressable.progress();
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java b/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
index c90be93..33b7071 100644
--- a/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
+++ b/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
@@ -27,8 +27,6 @@ import static org.mockito.Mockito.when;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.giraph.time.Time;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Before;
 import org.junit.Test;
@@ -111,14 +109,14 @@ public class TestPredicateLock {
   }
 
   /**
-   * Thread signaled test for {@link PredicateLock#waitForever()}
+   * Thread signaled test for {@link PredicateLock#waitForTimeoutOrFail(long)}
    */
   @Test
   public void testWaitForever() {
     BspEvent event = new PredicateLock(getStubProgressable());
     Thread signalThread = new SignalThread(event);
     signalThread.start();
-    event.waitForever();
+    event.waitForTimeoutOrFail(5 * 60_000);
     try {
       signalThread.join();
     } catch (InterruptedException e) {