You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/08/11 22:50:28 UTC

samza git commit: SAMZA-1388: Flaky test - TestStatefulTask#testShouldStartAndRestore

Repository: samza
Updated Branches:
  refs/heads/master 46b3601f1 -> b1277d8b4


SAMZA-1388: Flaky test - TestStatefulTask#testShouldStartAndRestore

I believe the problem originated from SAMZA-173.

The core issue is testShouldRestoreStore was not updated to expect 6 messages after 2 more messages were added to testShouldStartTaskForFirstTime.

Fixed the issue and refactored the code so the 2 methods wouldn't disagree again in the future.

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #269 from jmakes/samza-1388


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b1277d8b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b1277d8b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b1277d8b

Branch: refs/heads/master
Commit: b1277d8b4cbda3a12f00dd2c8e86980086f2c640
Parents: 46b3601
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Aug 11 15:50:12 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Aug 11 15:50:12 2017 -0700

----------------------------------------------------------------------
 .../test/integration/TestStatefulTask.scala     | 79 +++++++-------------
 1 file changed, 29 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b1277d8b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index e5b6756..734487b 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -30,8 +30,16 @@ import org.junit.{AfterClass, BeforeClass, Test}
 import scala.collection.JavaConverters._
 
 object TestStatefulTask {
-    val STORE_NAME = "mystore"
-    val STATE_TOPIC_STREAM = "mystoreChangelog"
+  val STORE_NAME = "mystore"
+  val STATE_TOPIC_STREAM = "mystoreChangelog"
+
+  // Messages with one dupe and one delete. A negative string means delete. See StateStoreTestTask.testProcess()
+  val MESSAGES_SEND_1 = List("1", "2", "3", "2", "99", "-99")
+  val MESSAGES_RECV_1 = List("1", "2", "3", "2", "99", null)
+  val STORE_CONTENTS_1 = List("1", "2", "3")
+
+  val MESSAGES_SEND_2 = List("4", "5", "5")
+  val MESSAGES_RECV_2 = List("4", "5", "5")
 
   @BeforeClass
   def beforeSetupServers {
@@ -47,13 +55,13 @@ object TestStatefulTask {
 /**
  * Test that does the following:
  * 1. Start a single partition of TestStateStoreTask using ThreadJobFactory.
- * 2. Send four messages to input (1,2,3,2), which contain one dupe (2).
+ * 2. Send MESSAGES_SEND_1, which contains a dupe and a delete.
  * 3. Validate that all messages were received by TestStateStoreTask.
- * 4. Validate that TestStateStoreTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
+ * 4. Validate that TestStateStoreTask called store.put() for all messages, and that the messages ended up in the mystore topic.
  * 5. Kill the job.
  * 6. Start the job again.
- * 7. Validate that the job restored all messages (1,2,3) to the store.
- * 8. Send three more messages to input (4,5,5), and validate that TestStateStoreTask receives them.
+ * 7. Validate that the job restored all messages STORE_CONTENTS_1 to the store.
+ * 8. Send three more messages to input MESSAGES_SEND_2, and validate that TestStateStoreTask receives them.
  * 9. Kill the job again.
  */
 class TestStatefulTask extends StreamTaskTestUtil {
@@ -86,23 +94,12 @@ class TestStatefulTask extends StreamTaskTestUtil {
     assertEquals(0, task.received.size)
 
     // Send some messages to input stream.
-    send(task, "1")
-    send(task, "2")
-    send(task, "3")
-    send(task, "2")
-    send(task, "99")
-    send(task, "-99")
+    TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m))
 
     // Validate that messages appear in store stream.
     val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
 
-    assertEquals(6, messages.length)
-    assertEquals("1", messages(0))
-    assertEquals("2", messages(1))
-    assertEquals("3", messages(2))
-    assertEquals("2", messages(3))
-    assertEquals("99", messages(4))
-    assertNull(messages(5))
+    assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages)
 
     stopJob(job)
   }
@@ -111,52 +108,34 @@ class TestStatefulTask extends StreamTaskTestUtil {
     val (job, task) = startJob
 
     // Validate that restored has expected data.
-    assertEquals(3, task.asInstanceOf[StateStoreTestTask].restored.size)
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("1"))
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("2"))
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("3"))
+    assertEquals(TestStatefulTask.STORE_CONTENTS_1.length, task.asInstanceOf[StateStoreTestTask].restored.size)
+    TestStatefulTask.STORE_CONTENTS_1.foreach(m =>  assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains(m)))
 
     var count = 0
 
-    // We should get the original four messages in the stream (1,2,3,2).
+    // We should get the original size messages in the stream (1,2,3,2,99,-99).
     // Note that this will trigger four new outgoing messages to the STATE_TOPIC.
-    while (task.received.size < 4 && count < 100) {
+    while (task.received.size < TestStatefulTask.MESSAGES_RECV_1.length && count < 100) {
       Thread.sleep(600)
       count += 1
     }
 
     assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100)
 
-    // Reset the count down latch after the 4 messages come in.
+    // Reset the count down latch after the 6 messages come in.
     task.awaitMessage
 
     // Send some messages to input stream.
-    send(task, "4")
-    send(task, "5")
-    send(task, "5")
+    TestStatefulTask.MESSAGES_SEND_2.foreach(m => send(task, m))
+
+    val expectedMessagesRcvd =  TestStatefulTask.MESSAGES_RECV_1 ++ // From initial start.
+                                TestStatefulTask.MESSAGES_RECV_1 ++ // From second startup.
+                                TestStatefulTask.MESSAGES_RECV_2    // From sending in this method.
 
     // Validate that messages appear in store stream.
-    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 14, "testShouldRestoreStore")
-
-    assertEquals(15, messages.length)
-    // From initial start.
-    assertEquals("1", messages(0))
-    assertEquals("2", messages(1))
-    assertEquals("3", messages(2))
-    assertEquals("2", messages(3))
-    assertEquals("99", messages(4))
-    assertNull(messages(5))
-    // From second startup.
-    assertEquals("1", messages(6))
-    assertEquals("2", messages(7))
-    assertEquals("3", messages(8))
-    assertEquals("2", messages(9))
-    assertEquals("99", messages(10))
-    assertNull(messages(11))
-    // From sending in this method.
-    assertEquals("4", messages(12))
-    assertEquals("5", messages(13))
-    assertEquals("5", messages(14))
+    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, expectedMessagesRcvd.length-1, "testShouldRestoreStore")
+
+    assertEquals(expectedMessagesRcvd, messages)
 
     stopJob(job)
   }