You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/02/15 21:38:14 UTC

[1/3] storm git commit: STORM-2250: Kafka spout refactoring to increase modularity and testability. Also support nanoseconds in Storm time simulation

Repository: storm
Updated Branches:
  refs/heads/master db4695d2d -> d235a0c1b


STORM-2250: Kafka spout refactoring to increase modularity and testability. Also support nanoseconds in Storm time simulation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e75016c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e75016c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e75016c

Branch: refs/heads/master
Commit: 6e75016c45c602c874086dea26324ca413f0c141
Parents: db4695d
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Feb 14 21:31:45 2017 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Feb 14 21:31:45 2017 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   9 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 159 ++-------
 .../kafka/spout/internal/OffsetManager.java     | 157 +++++++++
 .../storm/kafka/spout/internal/Timer.java       |   7 +-
 .../spout/ByTopicRecordTranslatorTest.java      |   2 +-
 .../spout/DefaultRecordTranslatorTest.java      |   2 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   4 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  82 ++---
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 334 ++++++++++---------
 .../test/KafkaSpoutTopologyMainNamedTopics.java |   6 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   2 +-
 pom.xml                                         |   1 -
 .../src/jvm/org/apache/storm/utils/Time.java    | 146 +++++---
 13 files changed, 524 insertions(+), 387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 97ed359..0fdb64d 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -77,7 +77,13 @@
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
+            <artifactId>hamcrest-core</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
@@ -90,7 +96,6 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
-            <version>${log4j-over-slf4j.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index b96f3f9..f8a576c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,16 +25,13 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -48,6 +45,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -58,19 +56,19 @@ import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
     private static final long serialVersionUID = 4151921085047987154L;
+    //Initial delay for the commit and subscription refresh timers
+    public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
 
     // Storm
     protected SpoutOutputCollector collector;
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private final KafkaConsumerFactory kafkaConsumerFactory;
+    private KafkaConsumerFactory kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
     private transient boolean consumerAutoCommitMode;
 
-
     // Bookkeeping
     private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
     private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
@@ -78,7 +76,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode
+    private transient Map<TopicPartition, OffsetManager> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
@@ -87,13 +85,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
     }
     
     //This constructor is here for testing
     KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
-        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
         this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
     }
 
     @Override
@@ -114,9 +112,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         retryService = kafkaSpoutConfig.getRetryService();
 
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
-            commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+            commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
-        refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
         acked = new HashMap<>();
         emitted = new HashSet<>();
@@ -198,7 +196,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void setAcked(TopicPartition tp, long fetchOffset) {
         // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
         if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
-            acked.put(tp, new OffsetEntry(tp, fetchOffset));
+            acked.put(tp, new OffsetManager(tp, fetchOffset));
         }
     }
 
@@ -290,7 +288,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             if (offsetAndMeta != null) {
                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
             } else {
-                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);    // Seek to last committed offset
+                kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
             }
         }
     }
@@ -347,7 +345,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void commitOffsetsForAckedTuples() {
         // Find offsets that are ready to be committed for every topic partition
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
             final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -360,9 +358,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
             // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
             // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
-            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
-                final OffsetEntry offsetEntry = tpOffset.getValue();
-                offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
+                //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
+                final TopicPartition tp = tpOffset.getKey();
+                final OffsetManager offsetManager = acked.get(tp);
+                long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
+                numUncommittedOffsets -= numCommittedOffsets;
+                LOG.debug("[{}] uncommitted offsets across all topic partitions",
+                    numUncommittedOffsets);
             }
         } else {
             LOG.trace("No offsets to commit. {}", this);
@@ -483,127 +486,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private String getTopicsString() {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
+}
 
-    // ======= Offsets Commit Management ==========
-
-    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
-        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
-            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
-        }
-    }
-
-    /**
-     * This class is not thread safe
-     */
-    class OffsetEntry {
-        private final TopicPartition tp;
-        private final long initialFetchOffset;  /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
-                                                 * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
-        private long committedOffset;     // last offset committed to Kafka. Initially it is set to fetchOffset - 1
-        private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);     // acked messages sorted by ascending order of offset
-
-        public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
-            this.tp = tp;
-            this.initialFetchOffset = initialFetchOffset;
-            this.committedOffset = initialFetchOffset - 1;
-            LOG.debug("Instantiated {}", this);
-        }
-
-        public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
-            ackedMsgs.add(msgId);
-        }
-
-        /**
-         * An offset is only committed when all records with lower offset have
-         * been acked. This guarantees that all offsets smaller than the
-         * committedOffset have been delivered.
-         * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
-         */
-        public OffsetAndMetadata findNextCommitOffset() {
-            boolean found = false;
-            long currOffset;
-            long nextCommitOffset = committedOffset;
-            KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
-
-            for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
-                if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
-                    found = true;
-                    nextCommitMsg = currAckedMsg;
-                    nextCommitOffset = currOffset;
-                } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
-                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
-                    break;
-                } else {
-                    //Received a redundant ack. Ignore and continue processing.
-                    LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
-                            tp, currOffset,  committedOffset);
-                }
-            }
-
-            OffsetAndMetadata nextCommitOffsetAndMetadata = null;
-            if (found) {
-                nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
-                LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
-            } else {
-                LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
-            }
-            LOG.trace("{}", this);
-            return nextCommitOffsetAndMetadata;
-        }
-
-        /**
-         * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
-         * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
-         *
-         * @param committedOffset offset to be marked as committed
-         */
-        public void commit(OffsetAndMetadata committedOffset) {
-            long numCommittedOffsets = 0;
-            if (committedOffset != null) {
-                final long oldCommittedOffset = this.committedOffset;
-                numCommittedOffsets = committedOffset.offset() - this.committedOffset;
-                this.committedOffset = committedOffset.offset();
-                for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
-                    if (iterator.next().offset() <= committedOffset.offset()) {
-                        iterator.remove();
-                    } else {
-                        break;
-                    }
-                }
-                numUncommittedOffsets-= numCommittedOffsets;
-                LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
-                        oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
-            } else {
-                LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
-                        numCommittedOffsets, tp, numUncommittedOffsets);
-            }
-            LOG.trace("{}", this);
-        }
-
-        long getCommittedOffset() {
-            return committedOffset;
-        }
-
-        public boolean isEmpty() {
-            return ackedMsgs.isEmpty();
-        }
 
-        public boolean contains(ConsumerRecord<K, V> record) {
-            return contains(new KafkaSpoutMessageId(record));
-        }
-
-        public boolean contains(KafkaSpoutMessageId msgId) {
-            return ackedMsgs.contains(msgId);
-        }
 
-        @Override
-        public String toString() {
-            return "OffsetEntry{" +
-                    "topic-partition=" + tp +
-                    ", fetchOffset=" + initialFetchOffset +
-                    ", committedOffset=" + committedOffset +
-                    ", ackedMsgs=" + ackedMsgs +
-                    '}';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
new file mode 100755
index 0000000..4ce0471
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages acked and committed offsets for a TopicPartition. This class is not thread safe
+ */
+public class OffsetManager {
+
+    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
+    private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
+    private final TopicPartition tp;
+    /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
+    * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
+    private final long initialFetchOffset;
+    // Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+    private long committedOffset;
+    // Acked messages sorted by ascending order of offset
+    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
+
+    public OffsetManager(TopicPartition tp, long initialFetchOffset) {
+        this.tp = tp;
+        this.initialFetchOffset = initialFetchOffset;
+        this.committedOffset = initialFetchOffset - 1;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
+        ackedMsgs.add(msgId);
+    }
+
+    /**
+     * An offset is only committed when all records with lower offset have been
+     * acked. This guarantees that all offsets smaller than the committedOffset
+     * have been delivered.
+     *
+     * @return the next OffsetAndMetadata to commit, or null if no offset is
+     * ready to commit.
+     */
+    public OffsetAndMetadata findNextCommitOffset() {
+        boolean found = false;
+        long currOffset;
+        long nextCommitOffset = committedOffset;
+        KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
+
+        for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
+            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
+                found = true;
+                nextCommitMsg = currAckedMsg;
+                nextCommitOffset = currOffset;
+            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
+                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
+                break;
+            } else {
+                //Received a redundant ack. Ignore and continue processing.
+                LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
+                    tp, currOffset, committedOffset);
+            }
+        }
+
+        OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+        if (found) {
+            nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
+            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+        } else {
+            LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
+        }
+        LOG.trace("{}", this);
+        return nextCommitOffsetAndMetadata;
+    }
+
+    /**
+     * Marks an offset has committed. This method has side effects - it sets the
+     * internal state in such a way that future calls to
+     * {@link #findNextCommitOffset()} will return offsets greater than the
+     * offset specified, if any.
+     *
+     * @param committedOffset offset to be marked as committed
+     * @return Number of offsets committed in this commit
+     */
+    public long commit(OffsetAndMetadata committedOffset) {
+        long preCommitCommittedOffsets = this.committedOffset;
+        long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+        this.committedOffset = committedOffset.offset();
+        for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) {
+            if (iterator.next().offset() <= committedOffset.offset()) {
+                iterator.remove();
+            } else {
+                break;
+            }
+        }
+        LOG.trace("{}", this);
+        
+        LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
+                    preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp);
+        
+        return numCommittedOffsets;
+    }
+
+    public long getCommittedOffset() {
+        return committedOffset;
+    }
+
+    public boolean isEmpty() {
+        return ackedMsgs.isEmpty();
+    }
+
+    public boolean contains(ConsumerRecord record) {
+        return contains(new KafkaSpoutMessageId(record));
+    }
+
+    public boolean contains(KafkaSpoutMessageId msgId) {
+        return ackedMsgs.contains(msgId);
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetManager{"
+            + "topic-partition=" + tp
+            + ", fetchOffset=" + initialFetchOffset
+            + ", committedOffset=" + committedOffset
+            + ", ackedMsgs=" + ackedMsgs
+            + '}';
+    }
+
+    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
+
+        @Override
+        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
index d51104d..2a2e1cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -18,6 +18,7 @@
 package org.apache.storm.kafka.spout.internal;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
 
 public class Timer {
     private final long delay;
@@ -41,7 +42,7 @@ public class Timer {
         this.timeUnit = timeUnit;
 
         periodNanos = timeUnit.toNanos(period);
-        start = System.nanoTime() + timeUnit.toNanos(delay);
+        start = Time.nanoTime() + timeUnit.toNanos(delay);
     }
 
     public long period() {
@@ -65,9 +66,9 @@ public class Timer {
      * otherwise.
      */
     public boolean isExpiredResetOnTrue() {
-        final boolean expired = System.nanoTime() - start > periodNanos;
+        final boolean expired = Time.nanoTime() - start >= periodNanos;
         if (expired) {
-            start = System.nanoTime();
+            start = Time.nanoTime();
         }
         return expired;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
index fd53b15..1e4b43b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
index f4275e4..681953d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 08220dd..57e0120 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 68fd4a6..b882b67 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -44,6 +45,8 @@ import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -55,20 +58,18 @@ public class KafkaSpoutRebalanceTest {
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
-    private TopologyContext contextMock;
-    private SpoutOutputCollector collectorMock;
-    private Map<String, Object> conf;
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaConsumerFactory<String, String> consumerFactoryMock;
+    private KafkaConsumerFactory<String, String> consumerFactory;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        contextMock = mock(TopologyContext.class);
-        collectorMock = mock(SpoutOutputCollector.class);
-        conf = new HashMap<>();
         consumerMock = mock(KafkaConsumer.class);
-        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+        consumerFactory = (kafkaSpoutConfig) -> consumerMock;
     }
 
     //Returns messageIds in order of emission
@@ -93,9 +94,9 @@ public class KafkaSpoutRebalanceTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
         secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPartitionRecords))
-                .thenReturn(new ConsumerRecords(secondPartitionRecords))
-                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords(firstPartitionRecords))
+            .thenReturn(new ConsumerRecords(secondPartitionRecords))
+            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
 
         //Emit the messages
         spout.nextTuple();
@@ -109,7 +110,7 @@ public class KafkaSpoutRebalanceTest {
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
         consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
-        
+
         List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
         emittedMessageIds.add(messageIdForRevokedPartition.getValue());
         emittedMessageIds.add(messageIdForAssignedPartition.getValue());
@@ -119,47 +120,48 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock);
-        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
-        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
-        TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
-        //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-        
-        //Ack both emitted tuples
-        spout.ack(emittedMessageIds.get(0));
-        spout.ack(emittedMessageIds.get(1));
-
-        //Ensure the commit timer has expired
-        Thread.sleep(510);
-
-        //Make the spout commit any acked tuples
-        spout.nextTuple();
-        //Verify that it only committed the message on the assigned partition
-        verify(consumerMock).commitSync(commitCapture.capture());
-
-        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
-        assertThat(commitCaptureMap, hasKey(assignedPartition));
-        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactory);
+            String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+            TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+            TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+            //Emit a message on each partition and revoke the first partition
+            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+            //Ack both emitted tuples
+            spout.ack(emittedMessageIds.get(0));
+            spout.ack(emittedMessageIds.get(1));
+
+            //Ensure the commit timer has expired
+            Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Make the spout commit any acked tuples
+            spout.nextTuple();
+            //Verify that it only committed the message on the assigned partition
+            verify(consumerMock, times(1)).commitSync(commitCapture.capture());
+
+            Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+            assertThat(commitCaptureMap, hasKey(assignedPartition));
+            assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+        }
     }
-    
+
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactory);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
+
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-        
+
         //Fail both emitted tuples
         spout.fail(emittedMessageIds.get(0));
         spout.fail(emittedMessageIds.get(1));
-        
+
         //Check that only the tuple on the currently assigned partition is retried
         verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
         verify(retryServiceMock).schedule(emittedMessageIds.get(1));

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index c5e4e31..fdc9734 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+
 import info.batey.kafka.unit.KafkaUnitRule;
 import kafka.producer.KeyedMessage;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -28,21 +30,39 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import static org.junit.Assert.*;
-
 import java.util.Map;
 import java.util.stream.IntStream;
-import static org.mockito.Mockito.*;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
 
 public class SingleTopicKafkaSpoutTest {
 
     private class SpoutContext {
+
         public KafkaSpout<String, String> spout;
         public SpoutOutputCollector collector;
 
         public SpoutContext(KafkaSpout<String, String> spout,
-                            SpoutOutputCollector collector) {
+            SpoutOutputCollector collector) {
             this.spout = spout;
             this.collector = collector;
         }
@@ -51,190 +71,206 @@ public class SingleTopicKafkaSpoutTest {
     @Rule
     public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
 
-    void populateTopicData(String topicName, int msgCount) {
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private final TopologyContext topologyContext = mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumerFactory<String, String> consumerFactory;
+    private KafkaSpout<String, String> spout;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs);
+        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
+        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+    }
+
+    private void populateTopicData(String topicName, int msgCount) {
         kafkaUnitRule.getKafkaUnit().createTopic(topicName);
 
         IntStream.range(0, msgCount).forEach(value -> {
             KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
-                    topicName, Integer.toString(value),
-                    Integer.toString(value));
+                topicName, Integer.toString(value),
+                Integer.toString(value));
 
             kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
         });
     }
 
-    SpoutContext initializeSpout(int msgCount) {
+    private void initializeSpout(int msgCount) {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        int kafkaPort = kafkaUnitRule.getKafkaPort();
-
-        TopologyContext topology = mock(TopologyContext.class);
-        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
-        Map conf = mock(Map.class);
-
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort));
-        spout.open(conf, topology, collector);
+        spout.open(conf, topologyContext, collector);
         spout.activate();
-        return new SpoutContext(spout, collector);
     }
+
     /*
-     * Asserts that the next possible offset to commit or the committed offset is the provided offset.
-     * An offset that is ready to be committed is not guarenteed to be already committed.
+     * Asserts that commitSync has been called once, 
+     * that there are only commits on one topic,
+     * and that the committed offset covers messageCount messages
      */
-    private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) {
-
-        boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
-        OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
-        boolean nextOffsetMatch =  nextOffset != null && nextOffset.offset() == offset;
-        assertTrue("Next offset: " +
-                        entry.findNextCommitOffset() +
-                        " OR current offset: " +
-                        entry.getCommittedOffset() +
-                        " must equal desired offset: " +
-                        offset,
-                currentOffsetMatch | nextOffsetMatch);
+    private void verifyAllMessagesCommitted(long messageCount) {
+        verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+        Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+        assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+        OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+        assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1));
     }
 
     @Test
     public void shouldContinueWithSlowDoubleAcks() throws Exception {
-        int messageCount = 20;
-        SpoutContext context = initializeSpout(messageCount);
-
-        //play 1st tuple
-        ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
-        context.spout.ack(messageIdToDoubleAck.getValue());
-
-        IntStream.range(0, messageCount/2).forEach(value -> {
-            context.spout.nextTuple();
-        });
-
-        context.spout.ack(messageIdToDoubleAck.getValue());
-
-        IntStream.range(0, messageCount).forEach(value -> {
-            context.spout.nextTuple();
-        });
-
-        ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
-
-        verify(context.collector, times(messageCount)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 20;
+            initializeSpout(messageCount);
+
+            //play 1st tuple
+            ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+            spout.ack(messageIdToDoubleAck.getValue());
+
+            //Emit some more messages
+            IntStream.range(0, messageCount / 2).forEach(value -> {
+                spout.nextTuple();
+            });
+
+            spout.ack(messageIdToDoubleAck.getValue());
+
+            //Emit any remaining messages
+            IntStream.range(0, messageCount).forEach(value -> {
+                spout.nextTuple();
+            });
+
+            //Verify that all messages are emitted, ack all the messages
+            ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
+            verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 anyObject(),
-                remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+                messageIds.capture());
+            messageIds.getAllValues().iterator().forEachRemaining(spout::ack);
 
-        context.spout.acked.values().forEach(item -> {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
 
     @Test
     public void shouldEmitAllMessages() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-
-        IntStream.range(0, messageCount).forEach(value -> {
-            context.spout.nextTuple();
-            ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
-            verify(context.collector).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //Emit all messages and check that they are emitted. Ack the messages too
+            IntStream.range(0, messageCount).forEach(value -> {
+                spout.nextTuple();
+                ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+                verify(collector).emit(
                     eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                     eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                            Integer.toString(value),
-                            Integer.toString(value))),
-            messageId.capture());
-            context.spout.ack(messageId.getValue());
-            reset(context.collector);
-        });
-
-        context.spout.acked.values().forEach(item -> {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+                        Integer.toString(value),
+                        Integer.toString(value))),
+                    messageId.capture());
+                spout.ack(messageId.getValue());
+                reset(collector);
+            });
+
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
 
     @Test
     public void shouldReplayInOrderFailedMessages() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-        //play and ack 1 tuple
-        ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
-        context.spout.ack(messageIdAcked.getValue());
-        reset(context.collector);
-
-        //play and fail 1 tuple
-        ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
-        context.spout.fail(messageIdFailed.getValue());
-        reset(context.collector);
-
-        //pause so that failed tuples will be retried
-        Thread.sleep(200);
-
-
-        //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
-            context.spout.nextTuple();
-        });
-
-        ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
-
-        //1 message replayed, messageCount - 2 messages emitted for the first time
-        verify(context.collector, times(messageCount - 1)).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //play and ack 1 tuple
+            ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+            spout.ack(messageIdAcked.getValue());
+            reset(collector);
+
+            //play and fail 1 tuple
+            ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+            spout.fail(messageIdFailed.getValue());
+            reset(collector);
+
+            //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+            IntStream.range(0, messageCount).forEach(value -> {
+                spout.nextTuple();
+            });
+
+            ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+            //All messages except the first acked message should have been emitted
+            verify(collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 anyObject(),
                 remainingMessageIds.capture());
-        remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+            remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack);
 
-        context.spout.acked.values().forEach(item -> {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
 
     @Test
     public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-
-        //play 1st tuple
-        ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
-        reset(context.collector);
-
-        //play 2nd tuple
-        ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
-        reset(context.collector);
-
-        //ack 2nd tuple
-        context.spout.ack(messageIdToAck.getValue());
-        //fail 1st tuple
-        context.spout.fail(messageIdToFail.getValue());
-
-        //pause so that failed tuples will be retried
-        Thread.sleep(200);
-
-        //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
-            context.spout.nextTuple();
-        });
-
-        ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
-        //1 message replayed, messageCount - 2 messages emitted for the first time
-        verify(context.collector, times(messageCount - 1)).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //play 1st tuple
+            ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+            reset(collector);
+
+            //play 2nd tuple
+            ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+            reset(collector);
+
+            //ack 2nd tuple
+            spout.ack(messageIdToAck.getValue());
+            //fail 1st tuple
+            spout.fail(messageIdToFail.getValue());
+
+            //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+            IntStream.range(0, messageCount).forEach(value -> {
+                spout.nextTuple();
+            });
+
+            ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+            //All messages except the first acked message should have been emitted
+            verify(collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 anyObject(),
                 remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+            remainingIds.getAllValues().iterator().forEachRemaining(spout::ack);
 
-        context.spout.acked.values().forEach(item -> {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 2aeeb95..e305c8a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -50,9 +50,9 @@ public class KafkaSpoutTopologyMainNamedTopics {
 
     protected void runMain(String[] args) throws Exception {
         if (args.length == 0) {
-            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+            submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
         } else {
-            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
+            submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig());
         }
     }
 
@@ -82,7 +82,7 @@ public class KafkaSpoutTopologyMainNamedTopics {
         return config;
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
         tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index d0376e6..f811c7a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -37,7 +37,7 @@ public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMain
         new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
         tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 13c4c35..6d3543e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,7 +359,6 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index c5c6b6a..0401829 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -24,14 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+/**
+ * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time.
+ * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls.
+ * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime().
+ */
 public class Time {
     private static final Logger LOG = LoggerFactory.getLogger(Time.class);
     private static AtomicBoolean simulating = new AtomicBoolean(false);
-    private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
-    private static volatile Map<Thread, AtomicLong> threadSleepTimes;
+    private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0);
+    private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos;
     private static final Object sleepTimesLock = new Object();
-    private static AtomicLong simulatedCurrTimeMs;
+    private static AtomicLong simulatedCurrTimeNanos;
     
     public static class SimulatedTime implements AutoCloseable {
 
@@ -39,13 +43,13 @@ public class Time {
             this(null);
         }
         
-        public SimulatedTime(Number ms) {
+        public SimulatedTime(Number advanceTimeMs) {
             synchronized(Time.sleepTimesLock) {
                 Time.simulating.set(true);
-                Time.simulatedCurrTimeMs = new AtomicLong(0);
-                Time.threadSleepTimes = new ConcurrentHashMap<>();
-                if (ms != null) {
-                    Time.autoAdvanceOnSleep.set(ms.longValue());
+                Time.simulatedCurrTimeNanos = new AtomicLong(0);
+                Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+                if (advanceTimeMs != null) {
+                    Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue()));
                 }
                 LOG.warn("AutoCloseable Simulated Time Starting...");
             }
@@ -55,8 +59,8 @@ public class Time {
         public void close() {
             synchronized(Time.sleepTimesLock) {
                 Time.simulating.set(false);    
-                Time.autoAdvanceOnSleep.set(0);
-                Time.threadSleepTimes = null;
+                Time.autoAdvanceNanosOnSleep.set(0);
+                Time.threadSleepTimesNanos = null;
                 LOG.warn("AutoCloseable Simulated Time Ending...");
             }
         }
@@ -66,8 +70,8 @@ public class Time {
     public static void startSimulating() {
         synchronized(Time.sleepTimesLock) {
             Time.simulating.set(true);
-            Time.simulatedCurrTimeMs = new AtomicLong(0);
-            Time.threadSleepTimes = new ConcurrentHashMap<>();
+            Time.simulatedCurrTimeNanos = new AtomicLong(0);
+            Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
             LOG.warn("Simulated Time Starting...");
         }
     }
@@ -76,8 +80,8 @@ public class Time {
     public static void stopSimulating() {
         synchronized(Time.sleepTimesLock) {
             Time.simulating.set(false);    
-            Time.autoAdvanceOnSleep.set(0);
-            Time.threadSleepTimes = null;
+            Time.autoAdvanceNanosOnSleep.set(0);
+            Time.threadSleepTimesNanos = null;
             LOG.warn("Simulated Time Ending...");
         }
     }
@@ -88,44 +92,66 @@ public class Time {
     
     public static void sleepUntil(long targetTimeMs) throws InterruptedException {
         if(simulating.get()) {
-            try {
-                synchronized(sleepTimesLock) {
-                    if (threadSleepTimes == null) {
+            simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+        } else {
+            long sleepTimeMs = targetTimeMs - currentTimeMillis();
+            if(sleepTimeMs>0) {
+                Thread.sleep(sleepTimeMs);
+            }
+        }
+    }
+    
+    public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+        if(simulating.get()) {
+            simulatedSleepUntilNanos(targetTimeNanos);
+        } else {
+            long sleepTimeNanos = targetTimeNanos-nanoTime();
+            long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+            int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000);
+            if(sleepTimeNanos>0) {
+                Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+            } 
+        }
+    }
+    
+    private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+        try {
+            synchronized (sleepTimesLock) {
+                if (threadSleepTimesNanos == null) {
+                    LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
+                    throw new InterruptedException();
+                }
+                threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
+            }
+            while (simulatedCurrTimeNanos.get() < targetTimeNanos) {
+                synchronized (sleepTimesLock) {
+                    if (threadSleepTimesNanos == null) {
                         LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
                         throw new InterruptedException();
                     }
-                    threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
                 }
-                while(simulatedCurrTimeMs.get() < targetTimeMs) {
-                    synchronized(sleepTimesLock) {
-                        if (threadSleepTimes == null) {
-                            LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
-                            throw new InterruptedException();
-                        }
-                    }
-                    long autoAdvance = autoAdvanceOnSleep.get();
-                    if (autoAdvance > 0) {
-                        advanceTime(autoAdvance);
-                    }
-                    Thread.sleep(10);
+                long autoAdvance = autoAdvanceNanosOnSleep.get();
+                if (autoAdvance > 0) {
+                    advanceTimeNanos(autoAdvance);
                 }
-            } finally {
-                synchronized(sleepTimesLock) {
-                    if (simulating.get() && threadSleepTimes != null) {
-                        threadSleepTimes.remove(Thread.currentThread());
-                    }
+                Thread.sleep(10);
+            }
+        } finally {
+            synchronized (sleepTimesLock) {
+                if (simulating.get() && threadSleepTimesNanos != null) {
+                    threadSleepTimesNanos.remove(Thread.currentThread());
                 }
             }
-        } else {
-            long sleepTime = targetTimeMs-currentTimeMillis();
-            if(sleepTime>0) 
-                Thread.sleep(sleepTime);
         }
     }
 
     public static void sleep(long ms) throws InterruptedException {
         sleepUntil(currentTimeMillis()+ms);
     }
+    
+    public static void sleepNanos(long nanos) throws InterruptedException {
+        sleepUntilNanos(nanoTime() + nanos);
+    }
 
     public static void sleepSecs (long secs) throws InterruptedException {
         if (secs > 0) {
@@ -133,14 +159,30 @@ public class Time {
         }
     }
     
+    public static long nanoTime() {
+        if (simulating.get()) {
+            return simulatedCurrTimeNanos.get();
+        } else {
+            return System.nanoTime();
+        }
+    }
+    
     public static long currentTimeMillis() {
         if(simulating.get()) {
-            return simulatedCurrTimeMs.get();
+            return nanosToMillis(simulatedCurrTimeNanos.get());
         } else {
             return System.currentTimeMillis();
         }
     }
 
+    public static long nanosToMillis(long nanos) {
+        return nanos/1_000_000;
+    }
+    
+    public static long millisToNanos(long millis) {
+        return millis*1_000_000;
+    }
+    
     public static long secsToMillis (int secs) {
         return 1000*(long) secs;
     }
@@ -162,9 +204,17 @@ public class Time {
     }
     
     public static void advanceTime(long ms) {
-        if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
-        if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
-        long newTime = simulatedCurrTimeMs.addAndGet(ms);
+        advanceTimeNanos(millisToNanos(ms));
+    }
+    
+    public static void advanceTimeNanos(long nanos) {
+        if (!simulating.get()) {
+            throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+        }
+        if (nanos < 0) {
+            throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
+        }
+        long newTime = simulatedCurrTimeNanos.addAndGet(nanos);
         LOG.debug("Advanced simulated time to {}", newTime);
     }
     
@@ -173,11 +223,13 @@ public class Time {
     }
     
     public static boolean isThreadWaiting(Thread t) {
-        if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+        if(!simulating.get()) {
+            throw new IllegalStateException("Must be in simulation mode");
+        }
         AtomicLong time;
         synchronized(sleepTimesLock) {
-            time = threadSleepTimes.get(t);
+            time = threadSleepTimesNanos.get(t);
         }
-        return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
+        return !t.isAlive() || time!=null && nanoTime() < time.longValue();
     }
 }


[2/3] storm git commit: Merge branch 'STORM-2250' of github.com:srdo/storm

Posted by pt...@apache.org.
Merge branch 'STORM-2250' of github.com:srdo/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8e458f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8e458f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8e458f4

Branch: refs/heads/master
Commit: b8e458f4a0eb5c79e6099b52a7acf9268b7b655e
Parents: 560ff86 6e75016
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 15 16:33:21 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 15 16:33:21 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   2 +
 LICENSE                                         | 102 +++++-
 docs/storm-kafka-client.md                      |  22 ++
 external/storm-kafka-client/pom.xml             |   9 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 200 +++--------
 .../kafka/spout/internal/OffsetManager.java     | 157 +++++++++
 .../storm/kafka/spout/internal/Timer.java       |   7 +-
 .../spout/ByTopicRecordTranslatorTest.java      |   2 +-
 .../spout/DefaultRecordTranslatorTest.java      |   2 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   4 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  82 ++---
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 334 ++++++++++---------
 .../test/KafkaSpoutTopologyMainNamedTopics.java |   6 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   2 +-
 pom.xml                                         |   1 -
 .../apache/storm/blobstore/BlobStoreUtils.java  |  11 +-
 .../src/jvm/org/apache/storm/utils/Time.java    | 146 +++++---
 storm-core/src/ui/public/css/style.css          |   7 +
 storm-core/src/ui/public/flux.html              | 157 +++++++++
 storm-core/src/ui/public/images/bolt.png        | Bin 0 -> 6019 bytes
 storm-core/src/ui/public/images/flux.png        | Bin 0 -> 5328 bytes
 storm-core/src/ui/public/images/spout.png       | Bin 0 -> 5255 bytes
 storm-core/src/ui/public/index.html             |   7 +-
 storm-core/src/ui/public/js/cytoscape-dagre.js  | 192 +++++++++++
 storm-core/src/ui/public/js/cytoscape.min.js    |  63 ++++
 storm-core/src/ui/public/js/dagre.min.js        |   6 +
 storm-core/src/ui/public/js/esprima.min.js      |   2 +
 storm-core/src/ui/public/js/js-yaml.min.js      |   3 +
 28 files changed, 1122 insertions(+), 404 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-2250 to changelog

Posted by pt...@apache.org.
add STORM-2250 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d235a0c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d235a0c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d235a0c1

Branch: refs/heads/master
Commit: d235a0c1b49a4ead966d2288e81058495d2ada44
Parents: b8e458f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 15 16:34:13 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 15 16:34:13 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d235a0c1/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f731bd..efc3490 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
-## 2.0.0
+\ufeff## 2.0.0
+ * STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability
  * STORM-2346: Files with unapproved licenses: download-rc-directory.sh verify-release-file.sh
  * STORM-2350: Storm-HDFS's listFilesByModificationTime is broken
  * STORM-1961: Stream api for storm core use cases