You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/06 00:55:32 UTC

git commit: SAMZA-173; fix NPE when changelog restore includes a message with a null value

Repository: incubator-samza
Updated Branches:
  refs/heads/master 55929095f -> 12594fb71


SAMZA-173; fix NPE when changelog restore includes a message with a null value


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/12594fb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/12594fb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/12594fb7

Branch: refs/heads/master
Commit: 12594fb710260b62f9a21a8be785bd8dd5dcdd01
Parents: 5592909
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Mar 5 15:55:16 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Mar 5 15:55:16 2014 -0800

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemConsumer.scala      |  2 +-
 .../storage/kv/KeyValueStorageEngine.scala      |  6 ++-
 .../test/integration/TestStatefulTask.scala     | 43 ++++++++++++++------
 3 files changed, 37 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index eb48aa3..afbd7cd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -169,7 +169,7 @@ private[kafka] class KafkaSystemConsumer(
       } else {
         null
       }
-      val message = if (msg.message.buffer != null) {
+      val message = if (!msg.message.isNull) {
         deserializer.fromBytes(Utils.readBytes(msg.message.payload))
       } else {
         null

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index fc22383..f42ea02 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -91,7 +91,11 @@ class KeyValueStorageEngine[K, V](
         batch.clear()
       }
 
-      metrics.restoredBytes.inc(keyBytes.size + valBytes.size)
+      if (valBytes != null) {
+        metrics.restoredBytes.inc(valBytes.size)
+      }
+
+      metrics.restoredBytes.inc(keyBytes.size)
       metrics.restoredMessages.inc
       count += 1
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 493c984..7e81387 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -250,15 +250,19 @@ class TestStatefulTask {
     send(task, "2")
     send(task, "3")
     send(task, "2")
+    send(task, "99")
+    send(task, "-99")
 
     // Validate that messages appear in store stream.
-    val messages = readAll(STATE_TOPIC, 3, "testShouldStartTaskForFirstTime")
+    val messages = readAll(STATE_TOPIC, 5, "testShouldStartTaskForFirstTime")
 
-    assertEquals(4, messages.length)
+    assertEquals(6, messages.length)
     assertEquals("1", messages(0))
     assertEquals("2", messages(1))
     assertEquals("3", messages(2))
     assertEquals("2", messages(3))
+    assertEquals("99", messages(4))
+    assertEquals(null, messages(5))
 
     stopJob(job)
   }
@@ -292,23 +296,27 @@ class TestStatefulTask {
     send(task, "5")
 
     // Validate that messages appear in store stream.
-    val messages = readAll(STATE_TOPIC, 10, "testShouldRestoreStore")
+    val messages = readAll(STATE_TOPIC, 14, "testShouldRestoreStore")
 
-    assertEquals(11, messages.length)
+    assertEquals(15, messages.length)
     // From initial start.
     assertEquals("1", messages(0))
     assertEquals("2", messages(1))
     assertEquals("3", messages(2))
     assertEquals("2", messages(3))
+    assertEquals("99", messages(4))
+    assertEquals(null, messages(5))
     // From second startup.
-    assertEquals("1", messages(4))
-    assertEquals("2", messages(5))
-    assertEquals("3", messages(6))
+    assertEquals("1", messages(6))
     assertEquals("2", messages(7))
+    assertEquals("3", messages(8))
+    assertEquals("2", messages(9))
+    assertEquals("99", messages(10))
+    assertEquals(null, messages(11))
     // From sending in this method.
-    assertEquals("4", messages(8))
-    assertEquals("5", messages(9))
-    assertEquals("5", messages(10))
+    assertEquals("4", messages(12))
+    assertEquals("5", messages(13))
+    assertEquals("5", messages(14))
 
     stopJob(job)
   }
@@ -376,7 +384,11 @@ class TestStatefulTask {
 
     while (message == null || message.offset < maxOffsetInclusive) {
       message = stream.next
-      messages += new String(message.message, "UTF-8")
+      if (message.message == null) {
+        messages += null
+      } else {
+        messages += new String(message.message, "UTF-8")
+      }
       System.err.println("TestStatefulTask.readAll(): offset=%s, message=%s" format (message.offset, messages.last))
     }
 
@@ -436,7 +448,14 @@ class TestTask extends StreamTask with InitableTask {
     System.err.println("TestTask.process(): %s" format msg)
 
     received += msg
-    store.put(msg, msg)
+
+    // A negative string means delete
+    if (msg.startsWith("-")) {
+      store.delete(msg.substring(1))
+    } else {
+      store.put(msg, msg)
+    }
+
     coordinator.commit
 
     // Notify sender that we got a message.