You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/29 06:51:01 UTC

[samza] branch master updated: SAMZA-2143: NPE in CoordinatorStreamMessage#equals and some clean-up to CoordinatorStreamSystemConsumer (#970)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aaa589  SAMZA-2143: NPE in CoordinatorStreamMessage#equals and some clean-up to CoordinatorStreamSystemConsumer (#970)
4aaa589 is described below

commit 4aaa589cdd22250719c5c30a19f4f95f4c327110
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Thu Mar 28 23:50:57 2019 -0700

    SAMZA-2143: NPE in CoordinatorStreamMessage#equals and some clean-up to CoordinatorStreamSystemConsumer (#970)
---
 .../stream/CoordinatorStreamSystemConsumer.java           | 12 +++++-------
 .../stream/messages/CoordinatorStreamMessage.java         |  4 +++-
 .../coordinator/stream/TestCoordinatorStreamMessage.java  | 15 +++++++++++++++
 .../stream/TestCoordinatorStreamSystemConsumer.java       |  2 +-
 4 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 38255a2..ff28723 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -202,13 +202,11 @@ public class CoordinatorStreamSystemConsumer {
     }
   }
 
-  public Set<CoordinatorStreamMessage> getBoostrappedStream() {
-    log.info("Returning the bootstrapped data from the stream");
-    if (!isBootstrapped)
-      bootstrap();
-    return bootstrappedStreamSet;
-  }
-
+  /**
+   * Returns the set of bootstrapped {@link CoordinatorStreamMessage}s
+   * @param type The type of {@link CoordinatorStreamMessage}s to return.
+   * @return The bootstrapped {@link CoordinatorStreamMessage}s
+   */
   public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
     log.debug("Bootstrapping coordinator stream for messages of type {}", type);
     bootstrap();
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
index 415c8a5..a0e8794 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * </p>
  *
  * <pre>
- * key =&gt; [1, "set-config", "job.name"] 
+ * key =&gt; [1, "set-config", "job.name"]
  *
  * message =&gt; {
  *   "host": "192.168.0.1",
@@ -312,6 +312,8 @@ public class CoordinatorStreamMessage {
     if (messageMap == null) {
       if (other.messageMap != null)
         return false;
+    } else if (getMessageValues() == null) {
+      return other.getMessageValues() == null;
     } else if (!getMessageValues().equals(other.getMessageValues()))
       return false;
     return true;
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
index 3e8015b..3fd803c 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
+import java.util.HashMap;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.Delete;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -80,4 +81,18 @@ public class TestCoordinatorStreamMessage {
     assertEquals(message, message1);
     assertTrue(!message.equals(message2));
   }
+
+  @Test
+  public void testEqualsNPEonNullValues() {
+    String[] testKeys = {"key1", "key2"};
+    HashMap<String, Object> messageMap = new HashMap<>();
+    messageMap.put("values", new HashMap<String, String>());
+    HashMap<String, Object> messageMapWithNullValues = new HashMap<>();
+    messageMapWithNullValues.put("values", null);
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage(testKeys, messageMap);
+    CoordinatorStreamMessage messageWithNullValue = new CoordinatorStreamMessage(testKeys, messageMapWithNullValues);
+
+    assertFalse("Should not throw NPE and should not be equal to each other.",
+        messageWithNullValue.equals(message));
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 03dcbb1..6da2c8f 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -120,7 +120,7 @@ public class TestCoordinatorStreamSystemConsumer {
 
     consumer.bootstrap();
 
-    Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBoostrappedStream();
+    Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBootstrappedStream(SetConfig.TYPE);
 
     assertEquals(2, bootstrappedMessages.size()); // First message should have been removed as a duplicate
     CoordinatorStreamMessage[] coordinatorStreamMessages = bootstrappedMessages.toArray(new CoordinatorStreamMessage[2]);