You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/08 15:30:13 UTC
[2/3] git commit: [FLINK-909] Remove additional empty (and non empty
for iterative broadcast variables) superstep. [FLINK-945] Fix early memory
release in iterations
[FLINK-909] Remove additional empty (and non empty for iterative broadcast variables) superstep.
[FLINK-945] Fix early memory release in iterations
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/38cbf0b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/38cbf0b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/38cbf0b9
Branch: refs/heads/release-0.6
Commit: 38cbf0b915dea175f287d7c2a627467a02d28474
Parents: 4046e6a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 7 22:00:51 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Aug 8 15:27:30 2014 +0200
----------------------------------------------------------------------
.../concurrent/SuperstepKickoffLatch.java | 65 +++++
.../concurrent/SuperstepKickoffLatchBroker.java | 32 +++
.../task/AbstractIterativePactTask.java | 41 +--
.../iterative/task/IterationHeadPactTask.java | 21 +-
.../task/IterationIntermediatePactTask.java | 31 ++-
.../iterative/task/IterationTailPactTask.java | 38 +--
.../concurrent/SuperstepKickoffLatchTest.java | 247 +++++++++++++++++++
7 files changed, 404 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
new file mode 100644
index 0000000..b53928c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+public class SuperstepKickoffLatch {
+
+ private final Object monitor = new Object();
+
+ private int superstepNumber = 1;
+
+ private boolean terminated;
+
+ public void triggerNextSuperstep() {
+ synchronized (monitor) {
+ if (terminated) {
+ throw new IllegalStateException("Already teriminated.");
+ }
+ superstepNumber++;
+ monitor.notifyAll();
+ }
+ }
+
+ public void signalTermination() {
+ synchronized (monitor) {
+ terminated = true;
+ monitor.notifyAll();
+ }
+ }
+
+ public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException {
+ while (true) {
+ synchronized (monitor) {
+ if (terminated) {
+ return true;
+ }
+ else if (superstepNumber == superstep) {
+ // reached the superstep. all good!
+ return false;
+ }
+ else if (superstepNumber == superstep - 1) {
+ monitor.wait(2000);
+ }
+ else {
+ throw new IllegalStateException("Error while waiting for start of next superstep. current= " + superstepNumber + " waitingFor=" + superstep);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
new file mode 100644
index 0000000..41f6985
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
+
+ private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
+
+ private SuperstepKickoffLatchBroker() {}
+
+
+ public static Broker<SuperstepKickoffLatch> instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 636c492..4c03278 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -210,28 +210,17 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
return this.iterationAggregators;
}
- protected void checkForTerminationAndResetEndOfSuperstepState() throws IOException {
+ protected void verifyEndOfSuperstepState() throws IOException {
// sanity check that there is at least one iterative input reader
if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
- throw new IllegalStateException();
+ throw new IllegalStateException("Error: Iterative task without a single iterative input.");
}
- // check whether this step ended due to end-of-superstep, or proper close
- boolean anyClosed = false;
- boolean allClosed = true;
-
for (int inputNum : this.iterativeInputs) {
MutableReader<?> reader = this.inputReaders[inputNum];
- if (reader.isInputClosed()) {
- anyClosed = true;
- }
- else {
- // check if reader has reached the end of superstep, or if the operation skipped out early
+ if (!reader.isInputClosed()) {
if (reader.hasReachedEndOfSuperstep()) {
- allClosed = false;
-
- // also reset the end-of-superstep state
reader.startNextSuperstep();
}
else {
@@ -241,11 +230,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
while ((o = inIter.next(o)) != null);
- if (reader.isInputClosed()) {
- anyClosed = true;
- } else {
- allClosed = false;
-
+ if (!reader.isInputClosed()) {
// also reset the end-of-superstep state
reader.startNextSuperstep();
}
@@ -256,28 +241,16 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
for (int inputNum : this.iterativeBroadcastInputs) {
MutableReader<?> reader = this.broadcastInputReaders[inputNum];
- if (reader.isInputClosed()) {
- anyClosed = true;
- }
- else {
- // sanity check that the BC input is at the end of teh superstep
+ if (!reader.isInputClosed()) {
+
+ // sanity check that the BC input is at the end of the superstep
if (!reader.hasReachedEndOfSuperstep()) {
throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
}
- allClosed = false;
reader.startNextSuperstep();
}
}
-
- // sanity check whether we saw the same state (end-of-superstep or termination) on all inputs
- if (allClosed != anyClosed) {
- throw new IllegalStateException("Inconsistent state: Iteration termination received on some, but not all inputs.");
- }
-
- if (allClosed) {
- requestTermination();
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index 797bbb6..3dbd47c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
@@ -134,8 +136,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
private BlockingBackChannel initBackChannel() throws Exception {
/* get the size of the memory available to the backchannel */
- int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory
- ());
+ int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
/* allocate the memory available to the backchannel */
List<MemorySegment> segments = new ArrayList<MemorySegment>();
@@ -220,6 +221,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
try {
/* used for receiving the current iteration result from iteration tail */
+ SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
+ SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
+
BlockingBackChannel backChannel = initBackChannel();
SuperstepBarrier barrier = initSuperstepBarrier();
SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -316,12 +320,15 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
+ "]"));
}
requestTermination();
+ nextStepKickoff.signalTermination();
} else {
incrementIterationCounter();
String[] globalAggregateNames = barrier.getAggregatorNames();
Value[] globalAggregates = barrier.getAggregates();
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
+
+ nextStepKickoff.triggerNextSuperstep();
}
}
@@ -344,12 +351,10 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
// - solution set index
IterationAggregatorBroker.instance().remove(brokerKey);
BlockingBackChannelBroker.instance().remove(brokerKey);
- if (isWorksetIteration) {
- SolutionSetBroker.instance().remove(brokerKey);
- if (waitForSolutionSetUpdate) {
- SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
- }
- }
+ SuperstepKickoffLatchBroker.instance().remove(brokerKey);
+ SolutionSetBroker.instance().remove(brokerKey);
+ SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
+
if (solutionSet != null) {
solutionSet.close();
solutionSet = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index c23eae1..b12e70b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.io.network.api.BufferWriter;
import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
@@ -78,6 +80,8 @@ public class IterationIntermediatePactTask<S extends Function, OT> extends Abstr
@Override
public void run() throws Exception {
+
+ SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
while (this.running && !terminationRequested()) {
@@ -88,26 +92,31 @@ public class IterationIntermediatePactTask<S extends Function, OT> extends Abstr
super.run();
// check if termination was requested
- checkForTerminationAndResetEndOfSuperstepState();
+ verifyEndOfSuperstepState();
if (isWorksetUpdate && isWorksetIteration) {
long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
worksetAggregator.aggregate(numCollected);
}
-
+
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
+
+ // let the successors know that the end of this superstep data is reached
+ sendEndOfSuperstep();
+
+ if (isWorksetUpdate) {
+ // notify iteration head if responsible for workset update
+ worksetBackChannel.notifyOfEndOfSuperstep();
+ }
+
+ boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
- if (!terminationRequested()) {
- if (isWorksetUpdate) {
- // notify iteration head if responsible for workset update
- worksetBackChannel.notifyOfEndOfSuperstep();
- }
-
- // send the end-of-superstep
- sendEndOfSuperstep();
-
+ if (terminated) {
+ requestTermination();
+ }
+ else {
incrementIterationCounter();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 90d732c..0d9c903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.util.Collector;
@@ -95,23 +97,19 @@ public class IterationTailPactTask<S extends Function, OT> extends AbstractItera
@Override
public void run() throws Exception {
+
+ SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+
while (this.running && !terminationRequested()) {
if (log.isInfoEnabled()) {
log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
}
- try {
- super.run();
- }
- catch (NullPointerException e) {
- boolean terminationRequested = terminationRequested();
- System.out.println("Nullpoint exception when termination requested was " + terminationRequested);
- e.printStackTrace();
- }
+ super.run();
// check if termination was requested
- checkForTerminationAndResetEndOfSuperstepState();
+ verifyEndOfSuperstepState();
if (isWorksetUpdate && isWorksetIteration) {
// aggregate workset update element count
@@ -123,16 +121,20 @@ public class IterationTailPactTask<S extends Function, OT> extends AbstractItera
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
+
+ if (isWorksetUpdate) {
+ // notify iteration head if responsible for workset update
+ worksetBackChannel.notifyOfEndOfSuperstep();
+ } else if (isSolutionSetUpdate) {
+ // notify iteration head if responsible for solution set update
+ solutionSetUpdateBarrier.notifySolutionSetUpdate();
+ }
- if (!terminationRequested()) {
- if (isWorksetUpdate) {
- // notify iteration head if responsible for workset update
- worksetBackChannel.notifyOfEndOfSuperstep();
- } else if (isSolutionSetUpdate) {
- // notify iteration head if responsible for solution set update
- solutionSetUpdateBarrier.notifySolutionSetUpdate();
- }
-
+ boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+ if (terminate) {
+ requestTermination();
+ }
+ else {
incrementIterationCounter();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38cbf0b9/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
new file mode 100644
index 0000000..3173570
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SuperstepKickoffLatchTest {
+
+ @Test
+ public void testWaitFromOne() {
+ try {
+ SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+
+ Waiter w = new Waiter(latch, 2);
+ Thread waiter = new Thread(w);
+ waiter.setDaemon(true);
+ waiter.start();
+
+ WatchDog wd = new WatchDog(waiter, 2000);
+ wd.start();
+
+ Thread.sleep(100);
+
+ latch.triggerNextSuperstep();
+
+ wd.join();
+ if (wd.getError() != null) {
+ throw wd.getError();
+ }
+
+ if (w.getError() != null) {
+ throw w.getError();
+ }
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Error: " + t.getMessage());
+ }
+ }
+
+ @Test
+ public void testWaitAlreadyFulfilled() {
+ try {
+ SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+ latch.triggerNextSuperstep();
+
+ Waiter w = new Waiter(latch, 2);
+ Thread waiter = new Thread(w);
+ waiter.setDaemon(true);
+ waiter.start();
+
+ WatchDog wd = new WatchDog(waiter, 2000);
+ wd.start();
+
+ Thread.sleep(100);
+
+ wd.join();
+ if (wd.getError() != null) {
+ throw wd.getError();
+ }
+
+ if (w.getError() != null) {
+ throw w.getError();
+ }
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Error: " + t.getMessage());
+ }
+ }
+
+ @Test
+ public void testWaitIncorrect() {
+ try {
+ SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+ latch.triggerNextSuperstep();
+ latch.triggerNextSuperstep();
+
+ try {
+ latch.awaitStartOfSuperstepOrTermination(2);
+ Assert.fail("should throw exception");
+ }
+ catch (IllegalStateException e) {
+ // good
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWaitIncorrectAsync() {
+ try {
+ SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+ latch.triggerNextSuperstep();
+ latch.triggerNextSuperstep();
+
+ Waiter w = new Waiter(latch, 2);
+ Thread waiter = new Thread(w);
+ waiter.setDaemon(true);
+ waiter.start();
+
+ WatchDog wd = new WatchDog(waiter, 2000);
+ wd.start();
+
+ Thread.sleep(100);
+
+ wd.join();
+ if (wd.getError() != null) {
+ throw wd.getError();
+ }
+
+ if (w.getError() != null) {
+ if (!(w.getError() instanceof IllegalStateException)) {
+ throw new Exception("wrong exception type " + w.getError());
+ }
+ } else {
+ Assert.fail("should cause exception");
+ }
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Error: " + t.getMessage());
+ }
+ }
+
+ @Test
+ public void testWaitForTermination() {
+ try {
+ SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+ latch.triggerNextSuperstep();
+ latch.triggerNextSuperstep();
+
+ Waiter w = new Waiter(latch, 4);
+ Thread waiter = new Thread(w);
+ waiter.setDaemon(true);
+ waiter.start();
+
+ WatchDog wd = new WatchDog(waiter, 2000);
+ wd.start();
+
+ latch.signalTermination();
+
+ wd.join();
+ if (wd.getError() != null) {
+ throw wd.getError();
+ }
+
+ if (w.getError() != null) {
+ throw w.getError();
+ }
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Error: " + t.getMessage());
+ }
+ }
+
+ private static class Waiter implements Runnable {
+
+ private final SuperstepKickoffLatch latch;
+
+ private final int waitFor;
+
+ private volatile Throwable error;
+
+
+ public Waiter(SuperstepKickoffLatch latch, int waitFor) {
+ this.latch = latch;
+ this.waitFor = waitFor;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.awaitStartOfSuperstepOrTermination(waitFor);
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+ }
+
+ private static class WatchDog extends Thread {
+
+ private final Thread toWatch;
+
+ private final long timeOut;
+
+ private volatile Throwable failed;
+
+ public WatchDog(Thread toWatch, long timeout) {
+ setDaemon(true);
+ setName("Watchdog");
+ this.toWatch = toWatch;
+ this.timeOut = timeout;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ try {
+ toWatch.join(timeOut);
+
+ if (toWatch.isAlive()) {
+ this.failed = new Exception("timed out");
+ toWatch.interrupt();
+
+ toWatch.join(2000);
+ if (toWatch.isAlive()) {
+ toWatch.stop();
+ }
+ }
+ }
+ catch (Throwable t) {
+ failed = t;
+ }
+ }
+
+ public Throwable getError() {
+ return failed;
+ }
+ }
+}