You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/31 19:41:23 UTC
samza git commit: SAMZA-1971: Fix NPE in partition key computation
for InMemorySystemProducer
Repository: samza
Updated Branches:
refs/heads/master 7836bf08c -> ea5887178
SAMZA-1971: Fix NPE in partition key computation for InMemorySystemProducer
Author: bharathkk <co...@gmail.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #786 from bharathkk/fix-inmemory-partitionkey-npe
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ea588717
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ea588717
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ea588717
Branch: refs/heads/master
Commit: ea5887178a63fc0ccfaf50bb3d5f826145b9c509
Parents: 7836bf0
Author: bharathkk <co...@gmail.com>
Authored: Wed Oct 31 12:41:20 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 31 12:41:20 2018 -0700
----------------------------------------------------------------------
.../system/inmemory/InMemorySystemProducer.java | 22 ++++++++++-----
.../system/inmemory/TestInMemorySystem.java | 28 ++++++++++++++++++++
2 files changed, 44 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
index cd5e649..872488d 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -19,7 +19,7 @@
package org.apache.samza.system.inmemory;
-import java.util.Optional;
+import com.google.common.base.Preconditions;
import org.apache.samza.Partition;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
@@ -75,11 +75,21 @@ public class InMemorySystemProducer implements SystemProducer {
Object key = envelope.getKey();
Object message = envelope.getMessage();
- // use the hashcode from partition key in the outgoing message envelope or default to message hashcode
- int hashCode = Optional.ofNullable(envelope.getPartitionKey())
- .map(Object::hashCode)
- .orElse(message.hashCode());
- int partition = Math.abs(hashCode) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
+ Object partitionKey;
+ // We use the partition key from message if available, if not fallback to message key or use message as partition
+ // key as the final resort.
+ if (envelope.getPartitionKey() != null) {
+ partitionKey = envelope.getPartitionKey();
+ } else if (key != null) {
+ partitionKey = key;
+ } else {
+ partitionKey = message;
+ }
+
+ Preconditions.checkNotNull(partitionKey, "Failed to compute partition key for the message: " + envelope);
+
+ int partition =
+ Math.abs(partitionKey.hashCode()) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partition));
memoryManager.put(ssp, key, message);
http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 7d5dfd0..0a2e221 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -142,6 +142,34 @@ public class TestInMemorySystem {
assertTrue(results.get(0).isEndOfStream());
}
+ @Test
+ public void testNullMessageWithValidMessageKey() {
+ final String messageKey = "validKey";
+ SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry);
+ systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, messageKey, null));
+
+ SystemConsumer consumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry);
+
+ Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+ .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
+ .collect(Collectors.toSet());
+
+ // register the consumer for ssps
+ for (SystemStreamPartition ssp : sspsToPoll) {
+ consumer.register(ssp, "0");
+ }
+
+ List<IncomingMessageEnvelope> results = consumeRawMessages(consumer, sspsToPoll);
+ assertEquals(1, results.size());
+ assertEquals(results.get(0).getKey(), messageKey);
+ assertNull(results.get(0).getMessage());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullMessageWithNullKey() {
+ SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry);
+ systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, null));
+ }
private <T> List<T> consumeMessages(Set<SystemStreamPartition> sspsToPoll) {
SystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry);