You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/05/07 12:54:16 UTC
[1/2] storm git commit: STORM-3059: Fix NPE when the processing
guarantee is not AT_LEAST_ONCE and the spout filters out a null tuple
Repository: storm
Updated Branches:
refs/heads/1.x-branch ec2599999 -> a4447e64d
STORM-3059: Fix NPE when the processing guarantee is not AT_LEAST_ONCE and the spout filters out a null tuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d70b4754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d70b4754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d70b4754
Branch: refs/heads/1.x-branch
Commit: d70b475456e2d7cb83e80b45560f48b58dffd4d2
Parents: ec25999
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun May 6 10:16:25 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon May 7 08:36:38 2018 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 8 ++-
.../storm/kafka/NullRecordTranslator.java | 42 ++++++++++++++
.../spout/KafkaSpoutMessagingGuaranteeTest.java | 59 +++++++++++++++-----
.../kafka/spout/KafkaSpoutNullTupleTest.java | 36 +-----------
4 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e7f5156..e8ecb3e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -503,9 +503,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
/*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
* to allow its offset to be commited to Kafka*/
LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
- msgId.setNullTuple(true);
- offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
- ack(msgId);
+ if (isAtLeastOnceProcessing()) {
+ msgId.setNullTuple(true);
+ offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
+ ack(msgId);
+ }
}
}
return false;
http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
new file mode 100644
index 0000000..f2b2f98
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+
+public class NullRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+ @Override
+ public List<Object> apply(ConsumerRecord<K, V> record) {
+ return null;
+
+ }
+
+ @Override
+ public Fields getFieldsFor(String stream) {
+ return new Fields("topic", "key", "value");
+ }
+
+ @Override
+ public List<String> streams() {
+ return Collections.singletonList("default");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 082cc58..ca16237 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,8 +21,10 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
@@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.NullRecordTranslator;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -64,7 +67,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
+
private final TopologyContext contextMock = mock(TopologyContext.class);
private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
@@ -96,7 +99,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
inOrder.verify(consumerMock).poll(anyLong());
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
-
+
CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
assertThat(committedOffsets.get(partition).offset(), is(0L));
@@ -191,7 +194,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
.setTupleTrackingEnforced(true)
.build();
try (SimulatedTime time = new SimulatedTime()) {
- KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
@@ -204,13 +207,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
spout.ack(msgIdCaptor.getValue());
-
+
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
-
+
spout.nextTuple();
-
+
verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
@Override
public boolean matches(Object arg) {
@@ -228,27 +231,27 @@ public class KafkaSpoutMessagingGuaranteeTest {
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setTupleTrackingEnforced(true)
.build();
-
+
try (SimulatedTime time = new SimulatedTime()) {
- KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
spout.nextTuple();
-
+
when(consumerMock.position(partition)).thenReturn(1L);
ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
-
+
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+
spout.nextTuple();
-
+
verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
-
+
CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
assertThat(committedOffsets.get(partition).offset(), is(1L));
@@ -256,4 +259,32 @@ public class KafkaSpoutMessagingGuaranteeTest {
}
}
+ private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) {
+ //STORM-3059
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(processingGuaranteee)
+ .setTupleTrackingEnforced(true)
+ .setRecordTranslator(new NullRecordTranslator<String, String>())
+ .build();
+
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ verify(collectorMock, never()).emit(anyString(), anyList(), any());
+ }
+
+ @Test
+ public void testAtMostOnceModeCanFilterNullTuples() {
+ doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+ }
+
+ @Test
+ public void testNoGuaranteeModeCanFilterNullTuples() {
+ doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d70b4754/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 159366b..54393f7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -18,13 +18,10 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Time;
import org.junit.Test;
-import java.util.List;
import java.util.regex.Pattern;
import static org.mockito.Matchers.any;
@@ -33,6 +30,8 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import org.apache.storm.kafka.NullRecordTranslator;
+
public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
public KafkaSpoutNullTupleTest() {
@@ -42,11 +41,10 @@ public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
@Override
KafkaSpoutConfig<String, String> createSpoutConfig() {
-
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
- .setRecordTranslator(new NullRecordExtractor())
+ .setRecordTranslator(new NullRecordTranslator<String, String>())
.build();
}
@@ -72,32 +70,4 @@ public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
verifyAllMessagesCommitted(messageCount);
}
- private class NullRecordExtractor implements RecordTranslator {
-
- @Override
- public List<Object> apply(ConsumerRecord record) {
- return null;
-
- }
-
- @Override
- public Fields getFieldsFor(String stream) {
- return new Fields("topic", "key", "value");
- }
-
- /**
- * @return the list of streams that this will handle.
- */
- @Override
- public List<String> streams() {
- return null;
- }
-
- @Override
- public Object apply(Object record) {
- return null;
- }
- }
-
-
}
[2/2] storm git commit: Merge branch 'STORM-3059-1.x' of
https://github.com/srdo/storm into STORM-3059-1.x-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3059-1.x' of https://github.com/srdo/storm into STORM-3059-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4447e64
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4447e64
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4447e64
Branch: refs/heads/1.x-branch
Commit: a4447e64dd3db04d51fc01f4e2d9e022a200e19e
Parents: ec25999 d70b475
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 7 21:53:53 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 7 21:53:53 2018 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 8 ++-
.../storm/kafka/NullRecordTranslator.java | 42 ++++++++++++++
.../spout/KafkaSpoutMessagingGuaranteeTest.java | 59 +++++++++++++++-----
.../kafka/spout/KafkaSpoutNullTupleTest.java | 36 +-----------
4 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------