You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/04/28 11:24:11 UTC
storm git commit: [STORM-3043] Fix NullPointerException when apply()
returns null
Repository: storm
Updated Branches:
refs/heads/1.1.x-branch 22a962073 -> cfe11249d
[STORM-3043] Fix NullPointerException when apply() returns null
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cfe11249
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cfe11249
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cfe11249
Branch: refs/heads/1.1.x-branch
Commit: cfe11249da59182e09e7469baaaf665f2dcb3482
Parents: 22a9620
Author: cleroux <le...@gmail.com>
Authored: Fri Apr 27 22:36:05 2018 +0000
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Apr 28 13:23:56 2018 +0200
----------------------------------------------------------------------
.../kafka/spout/SimpleRecordTranslator.java | 6 +-
.../spout/ByTopicRecordTranslatorTest.java | 15 +++++
.../kafka/spout/SimpleRecordTranslatorTest.java | 59 ++++++++++++++++++++
3 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
index 46c2849..41f9f4e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -41,8 +41,12 @@ public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
+ List<Object> vals = func.apply(record);
+ if (vals == null) {
+ return null;
+ }
KafkaTuple ret = new KafkaTuple();
- ret.addAll(func.apply(record));
+ ret.addAll(vals);
return ret.routedTo(stream);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
index abc58f0..22845f0 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -49,6 +49,13 @@ public class ByTopicRecordTranslatorTest {
return new Values(record.key(), record.value());
}
};
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> NULL_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return null;
+ }
+ };
@Test
public void testBasic() {
@@ -74,6 +81,14 @@ public class ByTopicRecordTranslatorTest {
assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
}
+
+ @Test
+ public void testNullTranslation() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(NULL_FUNC, new Fields("key"));
+ ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(null, trans.apply(cr));
+ }
@Test(expected = IllegalArgumentException.class)
public void testFieldCollision() {
http://git-wip-us.apache.org/repos/asf/storm/blob/cfe11249/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
new file mode 100644
index 0000000..03bbc78
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class SimpleRecordTranslatorTest {
+ public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.value());
+ }
+ };
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> NULL_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return null;
+ }
+ };
+
+ @Test
+ public void testBasic() {
+ SimpleRecordTranslator<String, String> trans =
+ new SimpleRecordTranslator<>(JUST_VALUE_FUNC, new Fields("value"));
+ assertEquals(Arrays.asList("default"), trans.streams());
+ ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr));
+ }
+
+ @Test
+ public void testNullTranslation() {
+ SimpleRecordTranslator<String, String> trans =
+ new SimpleRecordTranslator<>(NULL_FUNC, new Fields("key"));
+ assertEquals(null, trans.apply(null));
+ }
+}