You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:45:39 UTC
[1/4] storm git commit: STORM-2087: storm-kafka-client improvements
for kafka 0.10 - Added unit tests to storm-kafka-client - Fixed bug in
kafka-spout-client that resulted in tuples not being replayed when a failure
occurred immediately after the
Repository: storm
Updated Branches:
refs/heads/1.x-branch 7701bf4ea -> 55b3099e1
STORM-2087: storm-kafka-client improvements for kafka 0.10
- Added unit tests to storm-kafka-client
- Fixed bug in kafka-spout-client that resulted in tuples not being replayed
when a failure occurred immediately after the last commit
- Modified the kafka spout to continue processing and not halt upon receiving
a double ack
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c17a59e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c17a59e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c17a59e
Branch: refs/heads/1.x-branch
Commit: 5c17a59ebf9f5667ccfb2dc0a127044b94a0de86
Parents: c149fe1
Author: Jeff Fenchel <jf...@gmail.com>
Authored: Sat Sep 10 18:32:35 2016 -0700
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 19:01:16 2016 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 26 ++
.../apache/storm/kafka/spout/KafkaSpout.java | 30 ++-
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 240 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 85 +++++++
.../builders/TopicKeyValueTupleBuilder.java | 40 ++++
5 files changed, 411 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 6b7b7db..2d113fa 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -52,6 +52,20 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!--test dependencies -->
<dependency>
@@ -65,6 +79,18 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j-over-slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e37d549..6528ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples
private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord
- private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+ transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
@@ -266,19 +266,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
- kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
+ kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
}
}
}
// ======== emit =========
private void emit() {
- emitTupleIfNotEmitted(waitingToEmit.next());
- waitingToEmit.remove();
+ while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) {
+ waitingToEmit.remove();
+ }
}
- // emits one tuple per record
- private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+
+ //Emits one tuple per record
+ //@return true if tuple was emitted
+ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
@@ -295,7 +298,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
retryService.remove(msgId); // re-emitted hence remove from failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ return true;
}
+ return false;
}
private void commitOffsetsForAckedTuples() {
@@ -451,7 +456,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
/**
* This class is not thread safe
*/
- private class OffsetEntry {
+ class OffsetEntry {
private final TopicPartition tp;
private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
* Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
@@ -479,7 +484,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
- if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
+ if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
@@ -487,8 +492,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
- LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
- break;
+ //Received a redundant ack. Ignore and continue processing.
+ LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
+ tp, currOffset, committedOffset);
}
}
@@ -532,6 +538,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.trace("{}", this);
}
+ long getCommittedOffset() {
+ return committedOffset;
+ }
+
public boolean isEmpty() {
return ackedMsgs.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
new file mode 100644
index 0000000..8fa7b80
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static org.mockito.Mockito.*;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+public class SingleTopicKafkaSpoutTest {
+
+ private class SpoutContext {
+ public KafkaSpout<String, String> spout;
+ public SpoutOutputCollector collector;
+
+ public SpoutContext(KafkaSpout<String, String> spout,
+ SpoutOutputCollector collector) {
+ this.spout = spout;
+ this.collector = collector;
+ }
+ }
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ void populateTopicData(String topicName, int msgCount) {
+ kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+ IntStream.range(0, msgCount).forEach(value -> {
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
+ topicName, Integer.toString(value),
+ Integer.toString(value));
+
+ kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
+ });
+ }
+
+ SpoutContext initializeSpout(int msgCount) {
+ populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ int kafkaPort = kafkaUnitRule.getKafkaPort();
+
+ TopologyContext topology = mock(TopologyContext.class);
+ SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ Map conf = mock(Map.class);
+
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+ spout.open(conf, topology, collector);
+ spout.activate();
+ return new SpoutContext(spout, collector);
+ }
+ /*
+ * Asserts that the next possible offset to commit or the committed offset is the provided offset.
+ * An offset that is ready to be committed is not guarenteed to be already committed.
+ */
+ private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) {
+
+ boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
+ OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
+ boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset;
+ assertTrue("Next offset: " +
+ entry.findNextCommitOffset() +
+ " OR current offset: " +
+ entry.getCommittedOffset() +
+ " must equal desired offset: " +
+ offset,
+ currentOffsetMatch | nextOffsetMatch);
+ }
+
+ @Test
+ public void shouldContinueWithSlowDoubleAcks() throws Exception {
+ int messageCount = 20;
+ SpoutContext context = initializeSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+ context.spout.nextTuple();
+ verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+ context.spout.ack(messageIdToDoubleAck.getValue());
+
+ IntStream.range(0, messageCount/2).forEach(value -> {
+ context.spout.nextTuple();
+ });
+
+ context.spout.ack(messageIdToDoubleAck.getValue());
+
+ IntStream.range(0, messageCount).forEach(value -> {
+ context.spout.nextTuple();
+ });
+
+ ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+
+ verify(context.collector, times(messageCount)).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyObject(),
+ remainingIds.capture());
+ remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+ context.spout.acked.values().forEach(item -> {
+ assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+ });
+ }
+
+ @Test
+ public void shouldEmitAllMessages() throws Exception {
+ int messageCount = 10;
+ SpoutContext context = initializeSpout(messageCount);
+
+
+ IntStream.range(0, messageCount).forEach(value -> {
+ context.spout.nextTuple();
+ ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+ verify(context.collector).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+ Integer.toString(value),
+ Integer.toString(value))),
+ messageId.capture());
+ context.spout.ack(messageId.getValue());
+ reset(context.collector);
+ });
+
+ context.spout.acked.values().forEach(item -> {
+ assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+ });
+ }
+
+ @Test
+ public void shouldReplayInOrderFailedMessages() throws Exception {
+ int messageCount = 10;
+ SpoutContext context = initializeSpout(messageCount);
+
+ //play and ack 1 tuple
+ ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+ context.spout.nextTuple();
+ verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+ context.spout.ack(messageIdAcked.getValue());
+ reset(context.collector);
+
+ //play and fail 1 tuple
+ ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+ context.spout.nextTuple();
+ verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+ context.spout.fail(messageIdFailed.getValue());
+ reset(context.collector);
+
+ //pause so that failed tuples will be retried
+ Thread.sleep(200);
+
+
+ //allow for some calls to nextTuple() to fail to emit a tuple
+ IntStream.range(0, messageCount + 5).forEach(value -> {
+ context.spout.nextTuple();
+ });
+
+ ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+
+ //1 message replayed, messageCount - 2 messages emitted for the first time
+ verify(context.collector, times(messageCount - 1)).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyObject(),
+ remainingMessageIds.capture());
+ remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+ context.spout.acked.values().forEach(item -> {
+ assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+ });
+ }
+
+ @Test
+ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+ int messageCount = 10;
+ SpoutContext context = initializeSpout(messageCount);
+
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+ context.spout.nextTuple();
+ verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+ reset(context.collector);
+
+ //play 2nd tuple
+ ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+ context.spout.nextTuple();
+ verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+ reset(context.collector);
+
+ //ack 2nd tuple
+ context.spout.ack(messageIdToAck.getValue());
+ //fail 1st tuple
+ context.spout.fail(messageIdToFail.getValue());
+
+ //pause so that failed tuples will be retried
+ Thread.sleep(200);
+
+ //allow for some calls to nextTuple() to fail to emit a tuple
+ IntStream.range(0, messageCount + 5).forEach(value -> {
+ context.spout.nextTuple();
+ });
+
+ ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+ //1 message replayed, messageCount - 2 messages emitted for the first time
+ verify(context.collector, times(messageCount - 1)).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyObject(),
+ remainingIds.capture());
+ remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+ context.spout.acked.values().forEach(item -> {
+ assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
new file mode 100644
index 0000000..baece93
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+public class SingleTopicKafkaSpoutConfiguration {
+ public static final String STREAM = "test_stream";
+ public static final String TOPIC = "test";
+
+ public static Config getConfig() {
+ Config config = new Config();
+ config.setDebug(true);
+ return config;
+ }
+
+ public static StormTopology getTopologyKafkaSpout(int port) {
+ final TopologyBuilder tp = new TopologyBuilder();
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+ tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
+ return tp.createTopology();
+ }
+
+ public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000)
+ .build();
+ }
+
+ protected static KafkaSpoutRetryService getRetryService() {
+ return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
+ KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+ }
+
+ protected static Map<String,Object> getKafkaConsumerProps(int port) {
+ Map<String, Object> props = new HashMap<>();
+ props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
+ props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+ props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("max.poll.records", "5");
+ return props;
+ }
+
+ protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+ return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+ new TopicKeyValueTupleBuilder<String, String>(TOPIC))
+ .build();
+ }
+
+ public static KafkaSpoutStreams getKafkaSpoutStreams() {
+ final Fields outputFields = new Fields("topic", "key", "value");
+ return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
new file mode 100644
index 0000000..4f20b58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicKeyValueTupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+}
[3/4] storm git commit: Merge branch 'STORM-2087-1.x' of
https://github.com/srdo/storm into STORM-2087-1.x-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2087-1.x' of https://github.com/srdo/storm into STORM-2087-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8310f593
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8310f593
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8310f593
Branch: refs/heads/1.x-branch
Commit: 8310f593c239577e65c2e955385320b95ed825f7
Parents: 7701bf4 e47ccec
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:42:54 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:42:54 2016 +0900
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 26 ++
.../apache/storm/kafka/spout/KafkaSpout.java | 30 ++-
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 245 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 85 +++++++
.../builders/TopicKeyValueTupleBuilder.java | 40 +++
5 files changed, 416 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[4/4] storm git commit: STORM-2087: CHANGELOG
Posted by ka...@apache.org.
STORM-2087: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/55b3099e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/55b3099e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/55b3099e
Branch: refs/heads/1.x-branch
Commit: 55b3099e1fdc297f235f57560d21ed160632835e
Parents: 8310f59
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:45:15 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:45:15 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/55b3099e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 99f58d7..3661e45 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
* STORM-2238: Add Timestamp extractor for windowed bolt
* STORM-2246: Logviewer download link has urlencoding on part of the URL
* STORM-2235: Introduce new option: 'add remote repositories' for dependency resolver
[2/4] storm git commit: STORM-2087: Make tests Java 7 compatible
Posted by ka...@apache.org.
STORM-2087: Make tests Java 7 compatible
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e47ccecb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e47ccecb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e47ccecb
Branch: refs/heads/1.x-branch
Commit: e47ccecba855889b75da40a3cceb190587552aef
Parents: 5c17a59
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Dec 14 19:29:48 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 20:33:02 2016 +0100
----------------------------------------------------------------------
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 77 +++++++++++---------
1 file changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e47ccecb/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 8fa7b80..6983160 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -31,7 +31,6 @@ import org.mockito.ArgumentCaptor;
import static org.junit.Assert.*;
import java.util.Map;
-import java.util.stream.IntStream;
import static org.mockito.Mockito.*;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
@@ -54,13 +53,13 @@ public class SingleTopicKafkaSpoutTest {
void populateTopicData(String topicName, int msgCount) {
kafkaUnitRule.getKafkaUnit().createTopic(topicName);
- IntStream.range(0, msgCount).forEach(value -> {
+ for (int i = 0; i < msgCount; i++){
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
- topicName, Integer.toString(value),
- Integer.toString(value));
+ topicName, Integer.toString(i),
+ Integer.toString(i));
kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
- });
+ };
}
SpoutContext initializeSpout(int msgCount) {
@@ -102,30 +101,32 @@ public class SingleTopicKafkaSpoutTest {
//play 1st tuple
ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
context.spout.ack(messageIdToDoubleAck.getValue());
- IntStream.range(0, messageCount/2).forEach(value -> {
+ for (int i = 0; i < messageCount/2; i++) {
context.spout.nextTuple();
- });
+ };
context.spout.ack(messageIdToDoubleAck.getValue());
- IntStream.range(0, messageCount).forEach(value -> {
+ for (int i = 0; i < messageCount; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
verify(context.collector, times(messageCount)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingIds.getAllValues()) {
+ context.spout.ack(id);
+ }
- context.spout.acked.values().forEach(item -> {
+ for(Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -134,22 +135,22 @@ public class SingleTopicKafkaSpoutTest {
SpoutContext context = initializeSpout(messageCount);
- IntStream.range(0, messageCount).forEach(value -> {
+ for (int i = 0; i < messageCount; i++) {
context.spout.nextTuple();
ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
verify(context.collector).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
- Integer.toString(value),
- Integer.toString(value))),
+ Integer.toString(i),
+ Integer.toString(i))),
messageId.capture());
context.spout.ack(messageId.getValue());
reset(context.collector);
- });
+ };
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -160,14 +161,14 @@ public class SingleTopicKafkaSpoutTest {
//play and ack 1 tuple
ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdAcked.capture());
context.spout.ack(messageIdAcked.getValue());
reset(context.collector);
//play and fail 1 tuple
ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdFailed.capture());
context.spout.fail(messageIdFailed.getValue());
reset(context.collector);
@@ -176,22 +177,24 @@ public class SingleTopicKafkaSpoutTest {
//allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
+ for (int i = 0; i < messageCount + 5; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
//1 message replayed, messageCount - 2 messages emitted for the first time
verify(context.collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingMessageIds.capture());
- remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingMessageIds.getAllValues()) {
+ context.spout.ack(id);
+ }
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -203,13 +206,13 @@ public class SingleTopicKafkaSpoutTest {
//play 1st tuple
ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToFail.capture());
reset(context.collector);
//play 2nd tuple
ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToAck.capture());
reset(context.collector);
//ack 2nd tuple
@@ -221,20 +224,22 @@ public class SingleTopicKafkaSpoutTest {
Thread.sleep(200);
//allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
+ for (int i = 0; i < messageCount + 5; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
//1 message replayed, messageCount - 2 messages emitted for the first time
verify(context.collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingIds.getAllValues()) {
+ context.spout.ack(id);
+ };
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
}
\ No newline at end of file