You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/06/15 20:49:37 UTC
samza git commit: SAMZA-572 - SamzaContainer checkpoints and windows
immediately on startup
Repository: samza
Updated Branches:
refs/heads/master 00c8abd7c -> b1586413c
SAMZA-572 - SamzaContainer checkpoints and windows immediately on startup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b1586413
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b1586413
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b1586413
Branch: refs/heads/master
Commit: b1586413c85ab6a03cd00fafa7d38aaa080c62dc
Parents: 00c8abd
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Jun 15 11:48:18 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Mon Jun 15 11:48:18 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/container/RunLoop.scala | 4 +-
.../apache/samza/container/TestRunLoop.scala | 44 +++++++++++++++++---
2 files changed, 40 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b1586413/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index e0c3e7e..c292ae4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -42,8 +42,8 @@ class RunLoop(
val clock: () => Long = { System.currentTimeMillis },
val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
- private var lastWindowMs = 0L
- private var lastCommitMs = 0L
+ private var lastWindowMs = clock()
+ private var lastCommitMs = clock()
private var activeMs = 0L
private var taskShutdownRequests: Set[TaskName] = Set()
private var taskCommitRequests: Set[TaskName] = Set()
http://git-wip-us.apache.org/repos/asf/samza/blob/b1586413/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index cec6477..64a5844 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -101,10 +101,10 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
intercept[StopRunLoop] { runLoop.run }
- verify(runLoop.taskInstances(taskName0), times(5)).window(anyObject)
- verify(runLoop.taskInstances(taskName1), times(5)).window(anyObject)
- verify(runLoop.taskInstances(taskName0), times(10)).commit
- verify(runLoop.taskInstances(taskName1), times(10)).commit
+ verify(runLoop.taskInstances(taskName0), times(4)).window(anyObject)
+ verify(runLoop.taskInstances(taskName1), times(4)).window(anyObject)
+ verify(runLoop.taskInstances(taskName0), times(9)).commit
+ verify(runLoop.taskInstances(taskName1), times(9)).commit
}
@Test
@@ -201,7 +201,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
testMetrics.chooseMs.getSnapshot.getAverage should equal(1L)
testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
testMetrics.processMs.getSnapshot.getAverage should equal(3L)
- testMetrics.commitMs.getSnapshot.getAverage should equal(3L)
+ testMetrics.commitMs.getSnapshot.getAverage should equal(0L)
now = 0L
intercept[StopRunLoop] { runLoop.run }
@@ -209,6 +209,38 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
testMetrics.chooseMs.getSnapshot.getSize should equal(2)
testMetrics.windowMs.getSnapshot.getSize should equal(2)
testMetrics.processMs.getSnapshot.getSize should equal(2)
- testMetrics.commitMs.getSnapshot.getSize should equal(2)
+ testMetrics.commitMs.getSnapshot.getSize should equal(1)
+ }
+
+ @Test
+ def testCommitAndWindowNotCalledImmediatelyOnStartUp {
+ var now = 0L
+ val consumers = mock[SystemConsumers]
+ val testMetrics = new SamzaContainerMetrics
+ val runLoop = new RunLoop(
+ taskInstances = getMockTaskInstances,
+ consumerMultiplexer = consumers,
+ metrics = testMetrics,
+ commitMs = 1L,
+ windowMs = 1L,
+ clock = () => {
+ now += 1L
+ if (now == 13L) throw new StopRunLoop
+ now
+ }
+ )
+
+ intercept[StopRunLoop] {
+ runLoop.run
+ }
+ now = 0L
+ intercept[StopRunLoop] {
+ runLoop.run
+ }
+
+ // after 2 run loops number of commits and windows should be 1,
+ // as commit and window should not be called immediately on startup
+ testMetrics.commits.getCount should equal(1L)
+ testMetrics.windows.getCount should equal(1L)
}
}