You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/06 23:32:10 UTC

samza git commit: SAMZA-272, SAMZA-1440, SAMZA-1269: Fixed thread interrupt tests in TestExponentialSleepStrategy.

Repository: samza
Updated Branches:
  refs/heads/master 357d6ca72 -> f8cce6e15


SAMZA-272, SAMZA-1440, SAMZA-1269: Fixed thread interrupt tests in TestExponentialSleepStrategy.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jacob Maes <jm...@apache.org>

Closes #318 from prateekm/ess-test-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f8cce6e1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f8cce6e1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f8cce6e1

Branch: refs/heads/master
Commit: f8cce6e1571dd500f32b62fa510715d93e8986fb
Parents: 357d6ca
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Oct 6 16:32:06 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Fri Oct 6 16:32:06 2017 -0700

----------------------------------------------------------------------
 .../samza/util/ExponentialSleepStrategy.scala   |  3 +-
 .../util/TestExponentialSleepStrategy.scala     | 46 +++++++++++++-------
 2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
index 4a04c13..da55371 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -73,7 +73,8 @@ class ExponentialSleepStrategy(
    * @param loopOperation The operation that should be attempted and may fail.
    * @param onException Handler function that determines what to do with an exception.
    * @return If loopOperation succeeded, an option containing the return value of
-   *         the last invocation. If done was called in the exception hander, None.
+   *         the last invocation. If done was called in the exception handler or the
+    *        thread was interrupted, None.
    */
   def run[A](loopOperation: RetryLoop => A, onException: (Exception, RetryLoop) => Unit): Option[A] = {
     val loop = startLoop

http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 546f41b..0514f8c 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.util
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 import org.junit.Assert._
 import org.junit.Test
@@ -116,45 +118,57 @@ class TestExponentialSleepStrategy {
     assertEquals(0, loopObject.sleepCount)
   }
 
-  def interruptedThread(operation: => Unit) = {
+  def interruptedThread(operationStartLatch: CountDownLatch, operation: => Unit): Option[Throwable] = {
     var exception: Option[Throwable] = None
     val interruptee = new Thread(new Runnable {
       def run {
         try { operation } catch { case e: Exception => exception = Some(e) }
       }
     })
-    interruptee.start
-    Thread.sleep(10) // give the thread a chance to make some progress before we interrupt it
-    interruptee.interrupt
-    interruptee.join
+    interruptee.start()
+    assertTrue("Operation start latch timed out.", operationStartLatch.await(1, TimeUnit.MINUTES))
+    interruptee.interrupt()
+    interruptee.join()
     exception
   }
 
-  // TODO fix in SAMZA-1269
-  // @Test
-  def testThreadInterruptInRetryLoop {
+  @Test def testThreadInterruptInRetryLoop {
     val strategy = new ExponentialSleepStrategy
     var iterations = 0
     var loopObject: RetryLoop = null
-    val exception = interruptedThread {
+    val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
+    val exception = interruptedThread(
+      loopStartLatch,
       strategy.run(
-        loop => { iterations += 1; loopObject = loop },
+        loop => { loopObject = loop; loopStartLatch.countDown(); iterations += 1; },
         (exception, loop) => throw exception
       )
-    }
-    assertEquals(classOf[InterruptedException], exception.get.getClass)
+    )
+
+    // The interrupt can cause either,
+    // 1. the retry loop to exit with None result, no exception and isDone == false, or
+    // 2. the sleeping thread (during the back-off) to throw an InterruptedException.
+    assertTrue((!loopObject.isDone && exception.isEmpty) ||
+      exception.get.getClass.equals(classOf[InterruptedException]))
   }
 
   @Test def testThreadInterruptInOperationSleep {
     val strategy = new ExponentialSleepStrategy
     var iterations = 0
     var loopObject: RetryLoop = null
-    val exception = interruptedThread {
+    val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
+    val exception = interruptedThread(
+      loopStartLatch,
       strategy.run(
-        loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) },
+        loop => { loopObject = loop; iterations += 1; loopStartLatch.countDown(); Thread.sleep(1000) },
         (exception, loop) => throw exception
       )
-    }
-    assertEquals(classOf[InterruptedException], exception.get.getClass)
+    )
+
+    // The interrupt can cause either,
+    // 1. the retry loop to exit with None result, no exception and isDone == false, or
+    // 2. the sleeping thread (in the operation or during the back-off) to throw an InterruptedException.
+    assertTrue((!loopObject.isDone && exception.isEmpty) ||
+      exception.get.getClass.equals(classOf[InterruptedException]))
   }
 }