You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/06 00:55:32 UTC
git commit: SAMZA-173;
fix NPE when changelog restore includes a message with a null value
Repository: incubator-samza
Updated Branches:
refs/heads/master 55929095f -> 12594fb71
SAMZA-173; fix NPE when changelog restore includes a message with a null value
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/12594fb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/12594fb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/12594fb7
Branch: refs/heads/master
Commit: 12594fb710260b62f9a21a8be785bd8dd5dcdd01
Parents: 5592909
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Mar 5 15:55:16 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Mar 5 15:55:16 2014 -0800
----------------------------------------------------------------------
.../system/kafka/KafkaSystemConsumer.scala | 2 +-
.../storage/kv/KeyValueStorageEngine.scala | 6 ++-
.../test/integration/TestStatefulTask.scala | 43 ++++++++++++++------
3 files changed, 37 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index eb48aa3..afbd7cd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -169,7 +169,7 @@ private[kafka] class KafkaSystemConsumer(
} else {
null
}
- val message = if (msg.message.buffer != null) {
+ val message = if (!msg.message.isNull) {
deserializer.fromBytes(Utils.readBytes(msg.message.payload))
} else {
null
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index fc22383..f42ea02 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -91,7 +91,11 @@ class KeyValueStorageEngine[K, V](
batch.clear()
}
- metrics.restoredBytes.inc(keyBytes.size + valBytes.size)
+ if (valBytes != null) {
+ metrics.restoredBytes.inc(valBytes.size)
+ }
+
+ metrics.restoredBytes.inc(keyBytes.size)
metrics.restoredMessages.inc
count += 1
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 493c984..7e81387 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -250,15 +250,19 @@ class TestStatefulTask {
send(task, "2")
send(task, "3")
send(task, "2")
+ send(task, "99")
+ send(task, "-99")
// Validate that messages appear in store stream.
- val messages = readAll(STATE_TOPIC, 3, "testShouldStartTaskForFirstTime")
+ val messages = readAll(STATE_TOPIC, 5, "testShouldStartTaskForFirstTime")
- assertEquals(4, messages.length)
+ assertEquals(6, messages.length)
assertEquals("1", messages(0))
assertEquals("2", messages(1))
assertEquals("3", messages(2))
assertEquals("2", messages(3))
+ assertEquals("99", messages(4))
+ assertEquals(null, messages(5))
stopJob(job)
}
@@ -292,23 +296,27 @@ class TestStatefulTask {
send(task, "5")
// Validate that messages appear in store stream.
- val messages = readAll(STATE_TOPIC, 10, "testShouldRestoreStore")
+ val messages = readAll(STATE_TOPIC, 14, "testShouldRestoreStore")
- assertEquals(11, messages.length)
+ assertEquals(15, messages.length)
// From initial start.
assertEquals("1", messages(0))
assertEquals("2", messages(1))
assertEquals("3", messages(2))
assertEquals("2", messages(3))
+ assertEquals("99", messages(4))
+ assertEquals(null, messages(5))
// From second startup.
- assertEquals("1", messages(4))
- assertEquals("2", messages(5))
- assertEquals("3", messages(6))
+ assertEquals("1", messages(6))
assertEquals("2", messages(7))
+ assertEquals("3", messages(8))
+ assertEquals("2", messages(9))
+ assertEquals("99", messages(10))
+ assertEquals(null, messages(11))
// From sending in this method.
- assertEquals("4", messages(8))
- assertEquals("5", messages(9))
- assertEquals("5", messages(10))
+ assertEquals("4", messages(12))
+ assertEquals("5", messages(13))
+ assertEquals("5", messages(14))
stopJob(job)
}
@@ -376,7 +384,11 @@ class TestStatefulTask {
while (message == null || message.offset < maxOffsetInclusive) {
message = stream.next
- messages += new String(message.message, "UTF-8")
+ if (message.message == null) {
+ messages += null
+ } else {
+ messages += new String(message.message, "UTF-8")
+ }
System.err.println("TestStatefulTask.readAll(): offset=%s, message=%s" format (message.offset, messages.last))
}
@@ -436,7 +448,14 @@ class TestTask extends StreamTask with InitableTask {
System.err.println("TestTask.process(): %s" format msg)
received += msg
- store.put(msg, msg)
+
+ // A negative string means delete
+ if (msg.startsWith("-")) {
+ store.delete(msg.substring(1))
+ } else {
+ store.put(msg, msg)
+ }
+
coordinator.commit
// Notify sender that we got a message.