You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/06/15 20:49:37 UTC

samza git commit: SAMZA-572 - SamzaContainer checkpoints and windows immediately on startup

Repository: samza
Updated Branches:
  refs/heads/master 00c8abd7c -> b1586413c


SAMZA-572 - SamzaContainer checkpoints and windows immediately on startup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b1586413
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b1586413
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b1586413

Branch: refs/heads/master
Commit: b1586413c85ab6a03cd00fafa7d38aaa080c62dc
Parents: 00c8abd
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Jun 15 11:48:18 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Mon Jun 15 11:48:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    |  4 +-
 .../apache/samza/container/TestRunLoop.scala    | 44 +++++++++++++++++---
 2 files changed, 40 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b1586413/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index e0c3e7e..c292ae4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -42,8 +42,8 @@ class RunLoop(
   val clock: () => Long = { System.currentTimeMillis },
   val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
 
-  private var lastWindowMs = 0L
-  private var lastCommitMs = 0L
+  private var lastWindowMs = clock()
+  private var lastCommitMs = clock()
   private var activeMs = 0L
   private var taskShutdownRequests: Set[TaskName] = Set()
   private var taskCommitRequests: Set[TaskName] = Set()

http://git-wip-us.apache.org/repos/asf/samza/blob/b1586413/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index cec6477..64a5844 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -101,10 +101,10 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
 
     intercept[StopRunLoop] { runLoop.run }
 
-    verify(runLoop.taskInstances(taskName0), times(5)).window(anyObject)
-    verify(runLoop.taskInstances(taskName1), times(5)).window(anyObject)
-    verify(runLoop.taskInstances(taskName0), times(10)).commit
-    verify(runLoop.taskInstances(taskName1), times(10)).commit
+    verify(runLoop.taskInstances(taskName0), times(4)).window(anyObject)
+    verify(runLoop.taskInstances(taskName1), times(4)).window(anyObject)
+    verify(runLoop.taskInstances(taskName0), times(9)).commit
+    verify(runLoop.taskInstances(taskName1), times(9)).commit
   }
 
   @Test
@@ -201,7 +201,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
     testMetrics.chooseMs.getSnapshot.getAverage should equal(1L)
     testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
     testMetrics.processMs.getSnapshot.getAverage should equal(3L)
-    testMetrics.commitMs.getSnapshot.getAverage should equal(3L)
+    testMetrics.commitMs.getSnapshot.getAverage should equal(0L)
 
     now = 0L
     intercept[StopRunLoop] { runLoop.run }
@@ -209,6 +209,38 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
     testMetrics.chooseMs.getSnapshot.getSize should equal(2)
     testMetrics.windowMs.getSnapshot.getSize should equal(2)
     testMetrics.processMs.getSnapshot.getSize should equal(2)
-    testMetrics.commitMs.getSnapshot.getSize should equal(2)
+    testMetrics.commitMs.getSnapshot.getSize should equal(1)
+  }
+
+  @Test
+  def testCommitAndWindowNotCalledImmediatelyOnStartUp {
+    var now = 0L
+    val consumers = mock[SystemConsumers]
+    val testMetrics = new SamzaContainerMetrics
+    val runLoop = new RunLoop(
+      taskInstances = getMockTaskInstances,
+      consumerMultiplexer = consumers,
+      metrics = testMetrics,
+      commitMs = 1L,
+      windowMs = 1L,
+      clock = () => {
+        now += 1L
+        if (now == 13L) throw new StopRunLoop
+        now
+      }
+    )
+
+    intercept[StopRunLoop] {
+      runLoop.run
+    }
+    now = 0L
+    intercept[StopRunLoop] {
+      runLoop.run
+    }
+
+    // after 2 run loops number of commits and windows should be 1,
+    // as commit and window should not be called immediately on startup
+    testMetrics.commits.getCount should equal(1L)
+    testMetrics.windows.getCount should equal(1L)
   }
 }