You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/29 06:51:01 UTC
[samza] branch master updated: SAMZA-2143: NPE in
CoordinatorStreamMessage#equals and some clean-up to
CoordinatorStreamSystemConsumer (#970)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4aaa589 SAMZA-2143: NPE in CoordinatorStreamMessage#equals and some clean-up to CoordinatorStreamSystemConsumer (#970)
4aaa589 is described below
commit 4aaa589cdd22250719c5c30a19f4f95f4c327110
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Thu Mar 28 23:50:57 2019 -0700
SAMZA-2143: NPE in CoordinatorStreamMessage#equals and some clean-up to CoordinatorStreamSystemConsumer (#970)
---
.../stream/CoordinatorStreamSystemConsumer.java | 12 +++++-------
.../stream/messages/CoordinatorStreamMessage.java | 4 +++-
.../coordinator/stream/TestCoordinatorStreamMessage.java | 15 +++++++++++++++
.../stream/TestCoordinatorStreamSystemConsumer.java | 2 +-
4 files changed, 24 insertions(+), 9 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 38255a2..ff28723 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -202,13 +202,11 @@ public class CoordinatorStreamSystemConsumer {
}
}
- public Set<CoordinatorStreamMessage> getBoostrappedStream() {
- log.info("Returning the bootstrapped data from the stream");
- if (!isBootstrapped)
- bootstrap();
- return bootstrappedStreamSet;
- }
-
+ /**
+ * Returns the set of bootstrapped {@link CoordinatorStreamMessage}s
+ * @param type The type of {@link CoordinatorStreamMessage}s to return.
+ * @return The bootstrapped {@link CoordinatorStreamMessage}s
+ */
public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
log.debug("Bootstrapping coordinator stream for messages of type {}", type);
bootstrap();
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
index 415c8a5..a0e8794 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
* </p>
*
* <pre>
- * key => [1, "set-config", "job.name"]
+ * key => [1, "set-config", "job.name"]
*
* message => {
* "host": "192.168.0.1",
@@ -312,6 +312,8 @@ public class CoordinatorStreamMessage {
if (messageMap == null) {
if (other.messageMap != null)
return false;
+ } else if (getMessageValues() == null) {
+ return other.getMessageValues() == null;
} else if (!getMessageValues().equals(other.getMessageValues()))
return false;
return true;
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
index 3e8015b..3fd803c 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import java.util.HashMap;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.Delete;
import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -80,4 +81,18 @@ public class TestCoordinatorStreamMessage {
assertEquals(message, message1);
assertTrue(!message.equals(message2));
}
+
+ @Test
+ public void testEqualsNPEonNullValues() {
+ String[] testKeys = {"key1", "key2"};
+ HashMap<String, Object> messageMap = new HashMap<>();
+ messageMap.put("values", new HashMap<String, String>());
+ HashMap<String, Object> messageMapWithNullValues = new HashMap<>();
+ messageMapWithNullValues.put("values", null);
+ CoordinatorStreamMessage message = new CoordinatorStreamMessage(testKeys, messageMap);
+ CoordinatorStreamMessage messageWithNullValue = new CoordinatorStreamMessage(testKeys, messageMapWithNullValues);
+
+ assertFalse("Should not throw NPE and should not be equal to each other.",
+ messageWithNullValue.equals(message));
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 03dcbb1..6da2c8f 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -120,7 +120,7 @@ public class TestCoordinatorStreamSystemConsumer {
consumer.bootstrap();
- Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBoostrappedStream();
+ Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBootstrappedStream(SetConfig.TYPE);
assertEquals(2, bootstrappedMessages.size()); // First message should have been removed as a duplicate
CoordinatorStreamMessage[] coordinatorStreamMessages = bootstrappedMessages.toArray(new CoordinatorStreamMessage[2]);