You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/04/28 11:24:11 UTC

storm git commit: [STORM-3043] Fix NullPointerException when apply() returns null

Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch 22a962073 -> cfe11249d


[STORM-3043] Fix NullPointerException when apply() returns null


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cfe11249
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cfe11249
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cfe11249

Branch: refs/heads/1.1.x-branch
Commit: cfe11249da59182e09e7469baaaf665f2dcb3482
Parents: 22a9620
Author: cleroux <le...@gmail.com>
Authored: Fri Apr 27 22:36:05 2018 +0000
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Apr 28 13:23:56 2018 +0200

----------------------------------------------------------------------
 .../kafka/spout/SimpleRecordTranslator.java     |  6 +-
 .../spout/ByTopicRecordTranslatorTest.java      | 15 +++++
 .../kafka/spout/SimpleRecordTranslatorTest.java | 59 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
index 46c2849..41f9f4e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -41,8 +41,12 @@ public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
     
     @Override
     public List<Object> apply(ConsumerRecord<K, V> record) {
+        List<Object> vals = func.apply(record);
+        if (vals == null) {
+            return null;
+        }
         KafkaTuple ret = new KafkaTuple();
-        ret.addAll(func.apply(record));
+        ret.addAll(vals);
         return ret.routedTo(stream);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
index abc58f0..22845f0 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -49,6 +49,13 @@ public class ByTopicRecordTranslatorTest {
             return new Values(record.key(), record.value());
         }
     };
+
+    public static Func<ConsumerRecord<String, String>, List<Object>> NULL_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return null;
+        }
+    };
     
     @Test
     public void testBasic() {
@@ -74,6 +81,14 @@ public class ByTopicRecordTranslatorTest {
         assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
         assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
     }
+
+    @Test
+    public void testNullTranslation() {
+        ByTopicRecordTranslator<String, String> trans =
+                new ByTopicRecordTranslator<>(NULL_FUNC, new Fields("key"));
+        ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(null, trans.apply(cr));
+    }
     
     @Test(expected = IllegalArgumentException.class)
     public void testFieldCollision() {

http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
new file mode 100644
index 0000000..03bbc78
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class SimpleRecordTranslatorTest {
+    public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+
+    public static Func<ConsumerRecord<String, String>, List<Object>> NULL_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return null;
+        }
+    };
+
+    @Test
+    public void testBasic() {
+        SimpleRecordTranslator<String, String> trans =
+                new SimpleRecordTranslator<>(JUST_VALUE_FUNC, new Fields("value"));
+        assertEquals(Arrays.asList("default"), trans.streams());
+        ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr));
+    }
+
+    @Test
+    public void testNullTranslation() {
+        SimpleRecordTranslator<String, String> trans =
+                new SimpleRecordTranslator<>(NULL_FUNC, new Fields("key"));
+        assertEquals(null, trans.apply(null));
+    }
+}