You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/31 19:41:23 UTC

samza git commit: SAMZA-1971: Fix NPE in partition key computation for InMemorySystemProducer

Repository: samza
Updated Branches:
  refs/heads/master 7836bf08c -> ea5887178


SAMZA-1971: Fix NPE in partition key computation for InMemorySystemProducer

Author: bharathkk <co...@gmail.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #786 from bharathkk/fix-inmemory-partitionkey-npe


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ea588717
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ea588717
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ea588717

Branch: refs/heads/master
Commit: ea5887178a63fc0ccfaf50bb3d5f826145b9c509
Parents: 7836bf0
Author: bharathkk <co...@gmail.com>
Authored: Wed Oct 31 12:41:20 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 31 12:41:20 2018 -0700

----------------------------------------------------------------------
 .../system/inmemory/InMemorySystemProducer.java | 22 ++++++++++-----
 .../system/inmemory/TestInMemorySystem.java     | 28 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
index cd5e649..872488d 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.system.inmemory;
 
-import java.util.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.samza.Partition;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
@@ -75,11 +75,21 @@ public class InMemorySystemProducer implements SystemProducer {
     Object key = envelope.getKey();
     Object message = envelope.getMessage();
 
-    // use the hashcode from partition key in the outgoing message envelope or default to message hashcode
-    int hashCode = Optional.ofNullable(envelope.getPartitionKey())
-        .map(Object::hashCode)
-        .orElse(message.hashCode());
-    int partition = Math.abs(hashCode) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
+    Object partitionKey;
+    // We use the partition key from message if available, if not fallback to message key or use message as partition
+    // key as the final resort.
+    if (envelope.getPartitionKey() != null) {
+      partitionKey = envelope.getPartitionKey();
+    } else if (key != null) {
+      partitionKey = key;
+    } else {
+      partitionKey = message;
+    }
+
+    Preconditions.checkNotNull(partitionKey, "Failed to compute partition key for the message: " + envelope);
+
+    int partition =
+        Math.abs(partitionKey.hashCode()) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
 
     SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partition));
     memoryManager.put(ssp, key, message);

http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 7d5dfd0..0a2e221 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -142,6 +142,34 @@ public class TestInMemorySystem {
     assertTrue(results.get(0).isEndOfStream());
   }
 
+  @Test
+  public void testNullMessageWithValidMessageKey() {
+    final String messageKey = "validKey";
+    SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry);
+    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, messageKey, null));
+
+    SystemConsumer consumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
+        .collect(Collectors.toSet());
+
+    // register the consumer for ssps
+    for (SystemStreamPartition ssp : sspsToPoll) {
+      consumer.register(ssp, "0");
+    }
+
+    List<IncomingMessageEnvelope> results = consumeRawMessages(consumer, sspsToPoll);
+    assertEquals(1, results.size());
+    assertEquals(results.get(0).getKey(), messageKey);
+    assertNull(results.get(0).getMessage());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullMessageWithNullKey() {
+    SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry);
+    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, null));
+  }
 
   private <T> List<T> consumeMessages(Set<SystemStreamPartition> sspsToPoll) {
     SystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry);