You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/06 23:32:10 UTC
samza git commit: SAMZA-272, SAMZA-1440,
SAMZA-1269: Fixed thread interrupt tests in
TestExponentialSleepStrategy.
Repository: samza
Updated Branches:
refs/heads/master 357d6ca72 -> f8cce6e15
SAMZA-272, SAMZA-1440, SAMZA-1269: Fixed thread interrupt tests in TestExponentialSleepStrategy.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jacob Maes <jm...@apache.org>
Closes #318 from prateekm/ess-test-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f8cce6e1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f8cce6e1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f8cce6e1
Branch: refs/heads/master
Commit: f8cce6e1571dd500f32b62fa510715d93e8986fb
Parents: 357d6ca
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Oct 6 16:32:06 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Fri Oct 6 16:32:06 2017 -0700
----------------------------------------------------------------------
.../samza/util/ExponentialSleepStrategy.scala | 3 +-
.../util/TestExponentialSleepStrategy.scala | 46 +++++++++++++-------
2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
index 4a04c13..da55371 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -73,7 +73,8 @@ class ExponentialSleepStrategy(
* @param loopOperation The operation that should be attempted and may fail.
* @param onException Handler function that determines what to do with an exception.
* @return If loopOperation succeeded, an option containing the return value of
- * the last invocation. If done was called in the exception hander, None.
+ * the last invocation. If done was called in the exception handler or the
+ * thread was interrupted, None.
*/
def run[A](loopOperation: RetryLoop => A, onException: (Exception, RetryLoop) => Unit): Option[A] = {
val loop = startLoop
http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 546f41b..0514f8c 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -19,6 +19,8 @@
package org.apache.samza.util
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
import org.junit.Assert._
import org.junit.Test
@@ -116,45 +118,57 @@ class TestExponentialSleepStrategy {
assertEquals(0, loopObject.sleepCount)
}
- def interruptedThread(operation: => Unit) = {
+ def interruptedThread(operationStartLatch: CountDownLatch, operation: => Unit): Option[Throwable] = {
var exception: Option[Throwable] = None
val interruptee = new Thread(new Runnable {
def run {
try { operation } catch { case e: Exception => exception = Some(e) }
}
})
- interruptee.start
- Thread.sleep(10) // give the thread a chance to make some progress before we interrupt it
- interruptee.interrupt
- interruptee.join
+ interruptee.start()
+ assertTrue("Operation start latch timed out.", operationStartLatch.await(1, TimeUnit.MINUTES))
+ interruptee.interrupt()
+ interruptee.join()
exception
}
- // TODO fix in SAMZA-1269
- // @Test
- def testThreadInterruptInRetryLoop {
+ @Test def testThreadInterruptInRetryLoop {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
- val exception = interruptedThread {
+ val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
+ val exception = interruptedThread(
+ loopStartLatch,
strategy.run(
- loop => { iterations += 1; loopObject = loop },
+ loop => { loopObject = loop; loopStartLatch.countDown(); iterations += 1; },
(exception, loop) => throw exception
)
- }
- assertEquals(classOf[InterruptedException], exception.get.getClass)
+ )
+
+ // The interrupt can cause either,
+ // 1. the retry loop to exit with None result, no exception and isDone == false, or
+ // 2. the sleeping thread (during the back-off) to throw an InterruptedException.
+ assertTrue((!loopObject.isDone && exception.isEmpty) ||
+ exception.get.getClass.equals(classOf[InterruptedException]))
}
@Test def testThreadInterruptInOperationSleep {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
- val exception = interruptedThread {
+ val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
+ val exception = interruptedThread(
+ loopStartLatch,
strategy.run(
- loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) },
+ loop => { loopObject = loop; iterations += 1; loopStartLatch.countDown(); Thread.sleep(1000) },
(exception, loop) => throw exception
)
- }
- assertEquals(classOf[InterruptedException], exception.get.getClass)
+ )
+
+ // The interrupt can cause either,
+ // 1. the retry loop to exit with None result, no exception and isDone == false, or
+ // 2. the sleeping thread (in the operation or during the back-off) to throw an InterruptedException.
+ assertTrue((!loopObject.isDone && exception.isEmpty) ||
+ exception.get.getClass.equals(classOf[InterruptedException]))
}
}