You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2017/03/01 22:06:08 UTC
git commit: updated refs/heads/trunk to 18c67ca
Repository: giraph
Updated Branches:
refs/heads/trunk f37f373a0 -> 18c67ca3c
GIRAPH-1132
closes #21
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/18c67ca3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/18c67ca3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/18c67ca3
Branch: refs/heads/trunk
Commit: 18c67ca3c221cf2961d59ac19a245762f4dd8f7d
Parents: f37f373
Author: Sergey Edunov <ed...@fb.com>
Authored: Wed Mar 1 14:05:54 2017 -0800
Committer: Sergey Edunov <ed...@fb.com>
Committed: Wed Mar 1 14:05:54 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/giraph/bsp/BspService.java | 3 ++-
.../org/apache/giraph/conf/GiraphConstants.java | 23 ++++++++++++++++++++
.../apache/giraph/master/BspServiceMaster.java | 8 +++++--
.../apache/giraph/worker/BspServiceWorker.java | 16 ++++++++++----
.../java/org/apache/giraph/zk/BspEvent.java | 5 +++--
.../org/apache/giraph/zk/PredicateLock.java | 6 ++++-
.../org/apache/giraph/zk/TestPredicateLock.java | 6 ++---
7 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index a2caf81..976997f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -281,7 +281,8 @@ public abstract class BspService<I extends WritableComparable,
conf.getZookeeperOpsRetryWaitMsecs(),
this,
context);
- connectedEvent.waitForever();
+ connectedEvent.waitForTimeoutOrFail(
+ GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
this.fs = FileSystem.get(getConfiguration());
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c7b04d8..3a3e8dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.OutputFormat;
+import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -1238,5 +1239,27 @@ public interface GiraphConstants {
BooleanConfOption PREFER_IP_ADDRESSES =
new BooleanConfOption("giraph.preferIP", false,
"Prefer IP addresses instead of host names");
+
+ /**
+ * Timeout for "waitForever", when we need to wait for zookeeper.
+ * Since we should never really have to wait forever.
+ * We should only wait some reasonable but large amount of time.
+ */
+ LongConfOption WAIT_ZOOKEEPER_TIMEOUT_MSEC =
+ new LongConfOption("giraph.waitZookeeperTimeoutMsec",
+ MINUTES.toMillis(15),
+ "How long should we stay in waitForever loops in various " +
+ "places that require network connection");
+
+ /**
+ * Timeout for "waitForever", when we need to wait for other workers
+ * to complete their job.
+ * Since we should never really have to wait forever.
+ * We should only wait some reasonable but large amount of time.
+ */
+ LongConfOption WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC =
+ new LongConfOption("giraph.waitForOtherWorkersMsec",
+ HOURS.toMillis(48),
+ "How long should workers wait to finish superstep");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 971e266..6b64cf5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -863,7 +863,9 @@ public class BspServiceMaster<I extends WritableComparable,
return isMaster;
}
LOG.info("becomeMaster: Waiting to become the master...");
- getMasterElectionChildrenChangedEvent().waitForever();
+ getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+ getConfiguration()));
getMasterElectionChildrenChangedEvent().reset();
} catch (KeeperException e) {
throw new IllegalStateException(
@@ -1832,7 +1834,9 @@ public class BspServiceMaster<I extends WritableComparable,
return;
}
- getCleanedUpChildrenChangedEvent().waitForever();
+ getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+ getConfiguration()));
getCleanedUpChildrenChangedEvent().reset();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b00eb7..b6b9c12 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -448,7 +448,9 @@ public class BspServiceWorker<I extends WritableComparable,
if (inputSplitsDoneStat != null) {
break;
}
- getInputSplitsAllDoneEvent().waitForever();
+ getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+ getConfiguration()));
getInputSplitsAllDoneEvent().reset();
}
}
@@ -647,7 +649,9 @@ else[HADOOP_NON_SECURE]*/
"from previous failure): " + myHealthPath +
". Waiting for change in attempts " +
"to re-join the application");
- getApplicationAttemptChangedEvent().waitForever();
+ getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
+ getConfiguration()));
if (LOG.isInfoEnabled()) {
LOG.info("registerHealth: Got application " +
"attempt changed event, killing self");
@@ -868,7 +872,9 @@ else[HADOOP_NON_SECURE]*/
private void waitForOtherWorkers(String superstepFinishedNode) {
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
- getSuperstepFinishedEvent().waitForever();
+ getSuperstepFinishedEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+ getConfiguration()));
getSuperstepFinishedEvent().reset();
}
} catch (KeeperException e) {
@@ -1683,7 +1689,9 @@ else[HADOOP_NON_SECURE]*/
LOG.info("exchangeVertexPartitions: Waiting for workers " +
workerIdSet);
}
- getPartitionExchangeChildrenChangedEvent().waitForever();
+ getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
+ GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
+ getConfiguration()));
getPartitionExchangeChildrenChangedEvent().reset();
}
} catch (KeeperException | InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
index 7d22f9a..d3b8e30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
@@ -43,7 +43,8 @@ public interface BspEvent {
boolean waitMsecs(int msecs);
/**
- * Wait indefinitely until the event occurs.
+ * Waits until timeout or fails with runtime exception.
+ * @param timeout Throws exception if waiting takes longer than timeout.
*/
- void waitForever();
+ void waitForTimeoutOrFail(long timeout);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
index 4937f82..982c417 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
@@ -134,8 +134,12 @@ public class PredicateLock implements BspEvent {
}
@Override
- public void waitForever() {
+ public void waitForTimeoutOrFail(long timeout) {
+ long t0 = System.currentTimeMillis();
while (!waitMsecs(msecPeriod)) {
+ if (System.currentTimeMillis() > t0 + timeout) {
+ throw new RuntimeException("Timeout waiting");
+ }
progressable.progress();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/18c67ca3/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java b/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
index c90be93..33b7071 100644
--- a/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
+++ b/giraph-core/src/test/java/org/apache/giraph/zk/TestPredicateLock.java
@@ -27,8 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.time.Time;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.Test;
@@ -111,14 +109,14 @@ public class TestPredicateLock {
}
/**
- * Thread signaled test for {@link PredicateLock#waitForever()}
+ * Thread signaled test for {@link PredicateLock#waitForTimeoutOrFail(long)}
*/
@Test
public void testWaitForever() {
BspEvent event = new PredicateLock(getStubProgressable());
Thread signalThread = new SignalThread(event);
signalThread.start();
- event.waitForever();
+ event.waitForTimeoutOrFail(5 * 60_000);
try {
signalThread.join();
} catch (InterruptedException e) {