You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/08/11 22:50:28 UTC
samza git commit: SAMZA-1388: Flaky test -
TestStatefulTask#testShouldStartAndRestore
Repository: samza
Updated Branches:
refs/heads/master 46b3601f1 -> b1277d8b4
SAMZA-1388: Flaky test - TestStatefulTask#testShouldStartAndRestore
I believe the problem originated from SAMZA-173.
The core issue is testShouldRestoreStore was not updated to expect 6 messages after 2 more messages were added to testShouldStartTaskForFirstTime.
Fixed the issue and refactored the code so the 2 methods wouldn't disagree again in the future.
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #269 from jmakes/samza-1388
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b1277d8b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b1277d8b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b1277d8b
Branch: refs/heads/master
Commit: b1277d8b4cbda3a12f00dd2c8e86980086f2c640
Parents: 46b3601
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Aug 11 15:50:12 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Aug 11 15:50:12 2017 -0700
----------------------------------------------------------------------
.../test/integration/TestStatefulTask.scala | 79 +++++++-------------
1 file changed, 29 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b1277d8b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index e5b6756..734487b 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -30,8 +30,16 @@ import org.junit.{AfterClass, BeforeClass, Test}
import scala.collection.JavaConverters._
object TestStatefulTask {
- val STORE_NAME = "mystore"
- val STATE_TOPIC_STREAM = "mystoreChangelog"
+ val STORE_NAME = "mystore"
+ val STATE_TOPIC_STREAM = "mystoreChangelog"
+
+ // Messages with one dupe and one delete. A negative string means delete. See StateStoreTestTask.testProcess()
+ val MESSAGES_SEND_1 = List("1", "2", "3", "2", "99", "-99")
+ val MESSAGES_RECV_1 = List("1", "2", "3", "2", "99", null)
+ val STORE_CONTENTS_1 = List("1", "2", "3")
+
+ val MESSAGES_SEND_2 = List("4", "5", "5")
+ val MESSAGES_RECV_2 = List("4", "5", "5")
@BeforeClass
def beforeSetupServers {
@@ -47,13 +55,13 @@ object TestStatefulTask {
/**
* Test that does the following:
* 1. Start a single partition of TestStateStoreTask using ThreadJobFactory.
- * 2. Send four messages to input (1,2,3,2), which contain one dupe (2).
+ * 2. Send MESSAGES_SEND_1, which contains a dupe and a delete.
* 3. Validate that all messages were received by TestStateStoreTask.
- * 4. Validate that TestStateStoreTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
+ * 4. Validate that TestStateStoreTask called store.put() for all messages, and that the messages ended up in the mystore topic.
* 5. Kill the job.
* 6. Start the job again.
- * 7. Validate that the job restored all messages (1,2,3) to the store.
- * 8. Send three more messages to input (4,5,5), and validate that TestStateStoreTask receives them.
+ * 7. Validate that the job restored all messages STORE_CONTENTS_1 to the store.
+ * 8. Send three more messages to input MESSAGES_SEND_2, and validate that TestStateStoreTask receives them.
* 9. Kill the job again.
*/
class TestStatefulTask extends StreamTaskTestUtil {
@@ -86,23 +94,12 @@ class TestStatefulTask extends StreamTaskTestUtil {
assertEquals(0, task.received.size)
// Send some messages to input stream.
- send(task, "1")
- send(task, "2")
- send(task, "3")
- send(task, "2")
- send(task, "99")
- send(task, "-99")
+ TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m))
// Validate that messages appear in store stream.
val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
- assertEquals(6, messages.length)
- assertEquals("1", messages(0))
- assertEquals("2", messages(1))
- assertEquals("3", messages(2))
- assertEquals("2", messages(3))
- assertEquals("99", messages(4))
- assertNull(messages(5))
+ assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages)
stopJob(job)
}
@@ -111,52 +108,34 @@ class TestStatefulTask extends StreamTaskTestUtil {
val (job, task) = startJob
// Validate that restored has expected data.
- assertEquals(3, task.asInstanceOf[StateStoreTestTask].restored.size)
- assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("1"))
- assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("2"))
- assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("3"))
+ assertEquals(TestStatefulTask.STORE_CONTENTS_1.length, task.asInstanceOf[StateStoreTestTask].restored.size)
+ TestStatefulTask.STORE_CONTENTS_1.foreach(m => assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains(m)))
var count = 0
- // We should get the original four messages in the stream (1,2,3,2).
+ // We should get the original size messages in the stream (1,2,3,2,99,-99).
// Note that this will trigger four new outgoing messages to the STATE_TOPIC.
- while (task.received.size < 4 && count < 100) {
+ while (task.received.size < TestStatefulTask.MESSAGES_RECV_1.length && count < 100) {
Thread.sleep(600)
count += 1
}
assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100)
- // Reset the count down latch after the 4 messages come in.
+ // Reset the count down latch after the 6 messages come in.
task.awaitMessage
// Send some messages to input stream.
- send(task, "4")
- send(task, "5")
- send(task, "5")
+ TestStatefulTask.MESSAGES_SEND_2.foreach(m => send(task, m))
+
+ val expectedMessagesRcvd = TestStatefulTask.MESSAGES_RECV_1 ++ // From initial start.
+ TestStatefulTask.MESSAGES_RECV_1 ++ // From second startup.
+ TestStatefulTask.MESSAGES_RECV_2 // From sending in this method.
// Validate that messages appear in store stream.
- val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 14, "testShouldRestoreStore")
-
- assertEquals(15, messages.length)
- // From initial start.
- assertEquals("1", messages(0))
- assertEquals("2", messages(1))
- assertEquals("3", messages(2))
- assertEquals("2", messages(3))
- assertEquals("99", messages(4))
- assertNull(messages(5))
- // From second startup.
- assertEquals("1", messages(6))
- assertEquals("2", messages(7))
- assertEquals("3", messages(8))
- assertEquals("2", messages(9))
- assertEquals("99", messages(10))
- assertNull(messages(11))
- // From sending in this method.
- assertEquals("4", messages(12))
- assertEquals("5", messages(13))
- assertEquals("5", messages(14))
+ val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, expectedMessagesRcvd.length-1, "testShouldRestoreStore")
+
+ assertEquals(expectedMessagesRcvd, messages)
stopJob(job)
}