You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:45:39 UTC

[1/4] storm git commit: STORM-2087: storm-kafka-client improvements for kafka 0.10 - Added unit tests to storm-kafka-client - Fixed bug in kafka-spout-client that resulted in tuples not being replayed when a failure occurred immediately after the

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7701bf4ea -> 55b3099e1


STORM-2087: storm-kafka-client improvements for kafka 0.10
 - Added unit tests to storm-kafka-client
 - Fixed bug in kafka-spout-client that resulted in tuples not being  replayed
    when a failure occurred immediately after the last commit
 - Modified the kafka spout to continue processing and not halt upon receiving
    a double ack


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c17a59e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c17a59e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c17a59e

Branch: refs/heads/1.x-branch
Commit: 5c17a59ebf9f5667ccfb2dc0a127044b94a0de86
Parents: c149fe1
Author: Jeff Fenchel <jf...@gmail.com>
Authored: Sat Sep 10 18:32:35 2016 -0700
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 19:01:16 2016 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  26 ++
 .../apache/storm/kafka/spout/KafkaSpout.java    |  30 ++-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 240 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  85 +++++++
 .../builders/TopicKeyValueTupleBuilder.java     |  40 ++++
 5 files changed, 411 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 6b7b7db..2d113fa 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -52,6 +52,20 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${storm.kafka.client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!--test dependencies -->
         <dependency>
@@ -65,6 +79,18 @@
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>info.batey.kafka</groupId>
+            <artifactId>kafka-unit</artifactId>
+            <version>0.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${log4j-over-slf4j.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e37d549..6528ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps all the logic to declare output fields and emit tuples
     private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // Object that contains the logic to build tuples for each ConsumerRecord
 
-    private transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
@@ -266,19 +266,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             if (offsetAndMeta != null) {
                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
             } else {
-                kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset
+                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);    // Seek to last committed offset
             }
         }
     }
 
     // ======== emit  =========
     private void emit() {
-        emitTupleIfNotEmitted(waitingToEmit.next());
-        waitingToEmit.remove();
+        while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) {
+            waitingToEmit.remove();
+        }
     }
 
-    // emits one tuple per record
-    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+
+    //Emits one tuple per record
+    //@return true if tuple was emitted
+    private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
 
@@ -295,7 +298,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 retryService.remove(msgId);  // re-emitted hence remove from failed
             }
             LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+            return true;
         }
+        return false;
     }
 
     private void commitOffsetsForAckedTuples() {
@@ -451,7 +456,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     /**
      * This class is not thread safe
      */
-    private class OffsetEntry {
+    class OffsetEntry {
         private final TopicPartition tp;
         private final long initialFetchOffset;  /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
                                                  * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
@@ -479,7 +484,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
 
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
-                if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) {            // found the next offset to commit
+                if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
@@ -487,8 +492,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
                     break;
                 } else {
-                    LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
-                    break;
+                    //Received a redundant ack. Ignore and continue processing.
+                    LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
+                            tp, currOffset,  committedOffset);
                 }
             }
 
@@ -532,6 +538,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("{}", this);
         }
 
+        long getCommittedOffset() {
+            return committedOffset;
+        }
+
         public boolean isEmpty() {
             return ackedMsgs.isEmpty();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
new file mode 100644
index 0000000..8fa7b80
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static org.mockito.Mockito.*;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+public class SingleTopicKafkaSpoutTest {
+
+    private class SpoutContext {
+        public KafkaSpout<String, String> spout;
+        public SpoutOutputCollector collector;
+
+        public SpoutContext(KafkaSpout<String, String> spout,
+                            SpoutOutputCollector collector) {
+            this.spout = spout;
+            this.collector = collector;
+        }
+    }
+
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    void populateTopicData(String topicName, int msgCount) {
+        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+        IntStream.range(0, msgCount).forEach(value -> {
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
+                    topicName, Integer.toString(value),
+                    Integer.toString(value));
+
+            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
+        });
+    }
+
+    SpoutContext initializeSpout(int msgCount) {
+        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        int kafkaPort = kafkaUnitRule.getKafkaPort();
+
+        TopologyContext topology = mock(TopologyContext.class);
+        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+        Map conf = mock(Map.class);
+
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+        spout.open(conf, topology, collector);
+        spout.activate();
+        return new SpoutContext(spout, collector);
+    }
+    /*
+     * Asserts that the next possible offset to commit or the committed offset is the provided offset.
+     * An offset that is ready to be committed is not guarenteed to be already committed.
+     */
+    private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) {
+
+        boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
+        OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
+        boolean nextOffsetMatch =  nextOffset != null && nextOffset.offset() == offset;
+        assertTrue("Next offset: " +
+                        entry.findNextCommitOffset() +
+                        " OR current offset: " +
+                        entry.getCommittedOffset() +
+                        " must equal desired offset: " +
+                        offset,
+                currentOffsetMatch | nextOffsetMatch);
+    }
+
+    @Test
+    public void shouldContinueWithSlowDoubleAcks() throws Exception {
+        int messageCount = 20;
+        SpoutContext context = initializeSpout(messageCount);
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+        context.spout.nextTuple();
+        verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+        context.spout.ack(messageIdToDoubleAck.getValue());
+
+        IntStream.range(0, messageCount/2).forEach(value -> {
+            context.spout.nextTuple();
+        });
+
+        context.spout.ack(messageIdToDoubleAck.getValue());
+
+        IntStream.range(0, messageCount).forEach(value -> {
+            context.spout.nextTuple();
+        });
+
+        ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+
+        verify(context.collector, times(messageCount)).emit(
+                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                anyObject(),
+                remainingIds.capture());
+        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+        context.spout.acked.values().forEach(item -> {
+            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+        });
+    }
+
+    @Test
+    public void shouldEmitAllMessages() throws Exception {
+        int messageCount = 10;
+        SpoutContext context = initializeSpout(messageCount);
+
+
+        IntStream.range(0, messageCount).forEach(value -> {
+            context.spout.nextTuple();
+            ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+            verify(context.collector).emit(
+                    eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                    eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+                            Integer.toString(value),
+                            Integer.toString(value))),
+            messageId.capture());
+            context.spout.ack(messageId.getValue());
+            reset(context.collector);
+        });
+
+        context.spout.acked.values().forEach(item -> {
+            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+        });
+    }
+
+    @Test
+    public void shouldReplayInOrderFailedMessages() throws Exception {
+        int messageCount = 10;
+        SpoutContext context = initializeSpout(messageCount);
+
+        //play and ack 1 tuple
+        ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+        context.spout.nextTuple();
+        verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+        context.spout.ack(messageIdAcked.getValue());
+        reset(context.collector);
+
+        //play and fail 1 tuple
+        ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+        context.spout.nextTuple();
+        verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+        context.spout.fail(messageIdFailed.getValue());
+        reset(context.collector);
+
+        //pause so that failed tuples will be retried
+        Thread.sleep(200);
+
+
+        //allow for some calls to nextTuple() to fail to emit a tuple
+        IntStream.range(0, messageCount + 5).forEach(value -> {
+            context.spout.nextTuple();
+        });
+
+        ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+
+        //1 message replayed, messageCount - 2 messages emitted for the first time
+        verify(context.collector, times(messageCount - 1)).emit(
+                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                anyObject(),
+                remainingMessageIds.capture());
+        remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+        context.spout.acked.values().forEach(item -> {
+            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+        });
+    }
+
+    @Test
+    public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+        int messageCount = 10;
+        SpoutContext context = initializeSpout(messageCount);
+
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+        context.spout.nextTuple();
+        verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+        reset(context.collector);
+
+        //play 2nd tuple
+        ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+        context.spout.nextTuple();
+        verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+        reset(context.collector);
+
+        //ack 2nd tuple
+        context.spout.ack(messageIdToAck.getValue());
+        //fail 1st tuple
+        context.spout.fail(messageIdToFail.getValue());
+
+        //pause so that failed tuples will be retried
+        Thread.sleep(200);
+
+        //allow for some calls to nextTuple() to fail to emit a tuple
+        IntStream.range(0, messageCount + 5).forEach(value -> {
+            context.spout.nextTuple();
+        });
+
+        ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+        //1 message replayed, messageCount - 2 messages emitted for the first time
+        verify(context.collector, times(messageCount - 1)).emit(
+                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                anyObject(),
+                remainingIds.capture());
+        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+
+        context.spout.acked.values().forEach(item -> {
+            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
+        });
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
new file mode 100644
index 0000000..baece93
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+public class SingleTopicKafkaSpoutConfiguration {
+    public static final String STREAM = "test_stream";
+    public static final String TOPIC = "test";
+
+    public static Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    public static StormTopology getTopologyKafkaSpout(int port) {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
+        return tp.createTopology();
+    }
+
+    public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .setPollTimeoutMs(1000)
+                .build();
+    }
+
+    protected static KafkaSpoutRetryService getRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
+                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+    }
+
+    protected static Map<String,Object> getKafkaConsumerProps(int port) {
+        Map<String, Object> props = new HashMap<>();
+        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
+        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("max.poll.records", "5");
+        return props;
+    }
+
+    protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+                new TopicKeyValueTupleBuilder<String, String>(TOPIC))
+                .build();
+    }
+
+    public static KafkaSpoutStreams getKafkaSpoutStreams() {
+        final Fields outputFields = new Fields("topic", "key", "value");
+        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC})  // contents of topics test sent to test_stream
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5c17a59e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
new file mode 100644
index 0000000..4f20b58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicKeyValueTupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.key(),
+                consumerRecord.value());
+    }
+}


[3/4] storm git commit: Merge branch 'STORM-2087-1.x' of https://github.com/srdo/storm into STORM-2087-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2087-1.x' of https://github.com/srdo/storm into STORM-2087-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8310f593
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8310f593
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8310f593

Branch: refs/heads/1.x-branch
Commit: 8310f593c239577e65c2e955385320b95ed825f7
Parents: 7701bf4 e47ccec
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:42:54 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:42:54 2016 +0900

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  26 ++
 .../apache/storm/kafka/spout/KafkaSpout.java    |  30 ++-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 245 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  85 +++++++
 .../builders/TopicKeyValueTupleBuilder.java     |  40 +++
 5 files changed, 416 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: STORM-2087: CHANGELOG

Posted by ka...@apache.org.
STORM-2087: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/55b3099e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/55b3099e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/55b3099e

Branch: refs/heads/1.x-branch
Commit: 55b3099e1fdc297f235f57560d21ed160632835e
Parents: 8310f59
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:45:15 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:45:15 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/55b3099e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 99f58d7..3661e45 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
  * STORM-2238: Add Timestamp extractor for windowed bolt
  * STORM-2246: Logviewer download link has urlencoding on part of the URL
  * STORM-2235: Introduce new option: 'add remote repositories' for dependency resolver


[2/4] storm git commit: STORM-2087: Make tests Java 7 compatible

Posted by ka...@apache.org.
STORM-2087: Make tests Java 7 compatible


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e47ccecb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e47ccecb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e47ccecb

Branch: refs/heads/1.x-branch
Commit: e47ccecba855889b75da40a3cceb190587552aef
Parents: 5c17a59
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Dec 14 19:29:48 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 20:33:02 2016 +0100

----------------------------------------------------------------------
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 77 +++++++++++---------
 1 file changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e47ccecb/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 8fa7b80..6983160 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -31,7 +31,6 @@ import org.mockito.ArgumentCaptor;
 import static org.junit.Assert.*;
 
 import java.util.Map;
-import java.util.stream.IntStream;
 import static org.mockito.Mockito.*;
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
 
@@ -54,13 +53,13 @@ public class SingleTopicKafkaSpoutTest {
     void populateTopicData(String topicName, int msgCount) {
         kafkaUnitRule.getKafkaUnit().createTopic(topicName);
 
-        IntStream.range(0, msgCount).forEach(value -> {
+        for (int i = 0; i < msgCount; i++){
             KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
-                    topicName, Integer.toString(value),
-                    Integer.toString(value));
+                    topicName, Integer.toString(i),
+                    Integer.toString(i));
 
             kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
-        });
+        };
     }
 
     SpoutContext initializeSpout(int msgCount) {
@@ -102,30 +101,32 @@ public class SingleTopicKafkaSpoutTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
         context.spout.ack(messageIdToDoubleAck.getValue());
 
-        IntStream.range(0, messageCount/2).forEach(value -> {
+        for (int i = 0; i < messageCount/2; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         context.spout.ack(messageIdToDoubleAck.getValue());
 
-        IntStream.range(0, messageCount).forEach(value -> {
+        for (int i = 0; i < messageCount; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
 
         verify(context.collector, times(messageCount)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingIds.getAllValues()) {
+            context.spout.ack(id);
+        }
 
-        context.spout.acked.values().forEach(item -> {
+        for(Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -134,22 +135,22 @@ public class SingleTopicKafkaSpoutTest {
         SpoutContext context = initializeSpout(messageCount);
 
 
-        IntStream.range(0, messageCount).forEach(value -> {
+        for (int i = 0; i < messageCount; i++) {
             context.spout.nextTuple();
             ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
             verify(context.collector).emit(
                     eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                     eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                            Integer.toString(value),
-                            Integer.toString(value))),
+                            Integer.toString(i),
+                            Integer.toString(i))),
             messageId.capture());
             context.spout.ack(messageId.getValue());
             reset(context.collector);
-        });
+        };
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -160,14 +161,14 @@ public class SingleTopicKafkaSpoutTest {
         //play and ack 1 tuple
         ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdAcked.capture());
         context.spout.ack(messageIdAcked.getValue());
         reset(context.collector);
 
         //play and fail 1 tuple
         ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdFailed.capture());
         context.spout.fail(messageIdFailed.getValue());
         reset(context.collector);
 
@@ -176,22 +177,24 @@ public class SingleTopicKafkaSpoutTest {
 
 
         //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
+        for (int i = 0; i < messageCount + 5; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
 
         //1 message replayed, messageCount - 2 messages emitted for the first time
         verify(context.collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingMessageIds.capture());
-        remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingMessageIds.getAllValues()) {
+            context.spout.ack(id);
+        }
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -203,13 +206,13 @@ public class SingleTopicKafkaSpoutTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToFail.capture());
         reset(context.collector);
 
         //play 2nd tuple
         ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToAck.capture());
         reset(context.collector);
 
         //ack 2nd tuple
@@ -221,20 +224,22 @@ public class SingleTopicKafkaSpoutTest {
         Thread.sleep(200);
 
         //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
+        for (int i = 0; i < messageCount + 5; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
         //1 message replayed, messageCount - 2 messages emitted for the first time
         verify(context.collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingIds.getAllValues()) {
+            context.spout.ack(id);
+        };
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 }
\ No newline at end of file