You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/05/07 12:54:16 UTC

[1/2] storm git commit: STORM-3059: Fix NPE when the processing guarantee is not AT_LEAST_ONCE and the spout filters out a null tuple

Repository: storm
Updated Branches:
  refs/heads/1.x-branch ec2599999 -> a4447e64d


STORM-3059: Fix NPE when the processing guarantee is not AT_LEAST_ONCE and the spout filters out a null tuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d70b4754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d70b4754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d70b4754

Branch: refs/heads/1.x-branch
Commit: d70b475456e2d7cb83e80b45560f48b58dffd4d2
Parents: ec25999
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun May 6 10:16:25 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon May 7 08:36:38 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  8 ++-
 .../storm/kafka/NullRecordTranslator.java       | 42 ++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 59 +++++++++++++++-----
 .../kafka/spout/KafkaSpoutNullTupleTest.java    | 36 +-----------
 4 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e7f5156..e8ecb3e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -503,9 +503,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
                 * to allow its offset to be commited to Kafka*/
                 LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
-                msgId.setNullTuple(true);
-                offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
-                ack(msgId);
+                if (isAtLeastOnceProcessing()) {
+                    msgId.setNullTuple(true);
+                    offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
+                    ack(msgId);
+                }
             }
         }
         return false;

http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
new file mode 100644
index 0000000..f2b2f98
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+
+public class NullRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return null;
+
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return new Fields("topic", "key", "value");
+    }
+
+    @Override
+    public List<String> streams() {
+        return Collections.singletonList("default");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 082cc58..ca16237 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,8 +21,10 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
@@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.NullRecordTranslator;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -64,7 +67,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
+    
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -96,7 +99,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         inOrder.verify(consumerMock).poll(anyLong());
         inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
-
+        
         CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
         assertThat(committedOffsets.get(partition).offset(), is(0L));
@@ -191,7 +194,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
             .setTupleTrackingEnforced(true)
             .build();
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
@@ -204,13 +207,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
 
             spout.ack(msgIdCaptor.getValue());
-
+            
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+            
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
-
+            
             spout.nextTuple();
-
+            
             verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
                 @Override
                 public boolean matches(Object arg) {
@@ -228,27 +231,27 @@ public class KafkaSpoutMessagingGuaranteeTest {
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-
+        
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
 
             spout.nextTuple();
-
+            
             when(consumerMock.position(partition)).thenReturn(1L);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
-
+            
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+            
             spout.nextTuple();
-
+            
             verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
-
+            
             CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
             Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
             assertThat(committedOffsets.get(partition).offset(), is(1L));
@@ -256,4 +259,32 @@ public class KafkaSpoutMessagingGuaranteeTest {
         }
     }
 
+    private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) {
+        //STORM-3059
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(processingGuaranteee)
+            .setTupleTrackingEnforced(true)
+            .setRecordTranslator(new NullRecordTranslator<String, String>())
+            .build();
+        
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+        
+        verify(collectorMock, never()).emit(anyString(), anyList(), any());
+    }
+    
+    @Test
+    public void testAtMostOnceModeCanFilterNullTuples() {
+        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+    }
+    
+    @Test
+    public void testNoGuaranteeModeCanFilterNullTuples() {
+        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 159366b..54393f7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -18,13 +18,10 @@
 package org.apache.storm.kafka.spout;
 
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Time;
 import org.junit.Test;
 
-import java.util.List;
 import java.util.regex.Pattern;
 
 import static org.mockito.Matchers.any;
@@ -33,6 +30,8 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
+import org.apache.storm.kafka.NullRecordTranslator;
+
 public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
 
     public KafkaSpoutNullTupleTest() {
@@ -42,11 +41,10 @@ public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
 
     @Override
     KafkaSpoutConfig<String, String> createSpoutConfig() {
-
         return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
                 Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-                .setRecordTranslator(new NullRecordExtractor())
+                .setRecordTranslator(new NullRecordTranslator<String, String>())
                 .build();
     }
 
@@ -72,32 +70,4 @@ public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
         verifyAllMessagesCommitted(messageCount);
     }
 
-    private class NullRecordExtractor implements RecordTranslator {
-
-        @Override
-        public List<Object> apply(ConsumerRecord record) {
-            return null;
-
-        }
-
-        @Override
-        public Fields getFieldsFor(String stream) {
-            return new Fields("topic", "key", "value");
-        }
-
-        /**
-         * @return the list of streams that this will handle.
-         */
-        @Override
-        public List<String> streams() {
-            return null;
-        }
-
-        @Override
-        public Object apply(Object record) {
-            return null;
-        }
-    }
-
-
 }


[2/2] storm git commit: Merge branch 'STORM-3059-1.x' of https://github.com/srdo/storm into STORM-3059-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-3059-1.x' of https://github.com/srdo/storm into STORM-3059-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4447e64
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4447e64
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4447e64

Branch: refs/heads/1.x-branch
Commit: a4447e64dd3db04d51fc01f4e2d9e022a200e19e
Parents: ec25999 d70b475
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 7 21:53:53 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 7 21:53:53 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  8 ++-
 .../storm/kafka/NullRecordTranslator.java       | 42 ++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 59 +++++++++++++++-----
 .../kafka/spout/KafkaSpoutNullTupleTest.java    | 36 +-----------
 4 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------