You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/05 02:58:00 UTC

[1/4] storm git commit: STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit

Repository: storm
Updated Branches:
  refs/heads/master 7fbe7a278 -> b5d70e17d


STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eff32a32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eff32a32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eff32a32

Branch: refs/heads/master
Commit: eff32a32bb724c37f4eaca6a363665c55b17813c
Parents: 7fbe7a2
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jan 27 19:22:07 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Feb 4 19:41:46 2018 +0100

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  47 ++-----
 .../apache/storm/kafka/spout/KafkaSpout.java    |  40 +++---
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 131 ++++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  13 ++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  54 ++++++--
 5 files changed, 160 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 0354a0d..a6f7ac1 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -1,3 +1,8 @@
+---
+title: Storm Kafka Integration (0.10.x+)
+layout: documentation
+documentation: true
+---
 # Storm Apache Kafka integration using the kafka-client jar
 This includes the new Apache Kafka consumer API.
 
@@ -147,18 +152,15 @@ of Java generics.  The deserializers can be specified via the consumer propertie
 There are a few key configs to pay attention to.
 
 `setFirstPollOffsetStrategy` allows you to set where to start consuming data from.  This is used both in case of failure recovery and starting the spout
-for the first time. Allowed values include
+for the first time. The allowed values are listed in the [FirstPollOffsetStrategy javadocs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html).
 
- * `EARLIEST` means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits
- * `LATEST` means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits
- * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `EARLIEST`.
- * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `LATEST`.
+`setProcessingGuarantee` lets you configure what processing guarantees the spout will provide. This affects how soon consumed offsets can be committed, and the frequency of commits. See the [ProcessingGuarantee javadoc](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html) for details.
 
 `setRecordTranslator` allows you to modify how the spout converts a Kafka Consumer Record into a Tuple, and which stream that tuple will be published into.
 By default the "topic", "partition", "offset", "key", and "value" will be emitted to the "default" stream.  If you want to output entries to different
 streams based on the topic, storm provides `ByTopicRecordTranslator`.  See below for more examples on how to use these.
 
-`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs).
+`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs). Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw an exception if the "enable.auto.commit" property is set, and the consumer used by the spout will always have that property set to false. You can configure similar behavior to autocommit through the `setProcessingGuarantee` method on the KafkaSpoutConfig builder.
 
 ### Usage Examples
 
@@ -274,7 +276,7 @@ When selecting a kafka client version, you should ensure -
 
 # Kafka Spout Performance Tuning
 
-The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 
+The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [setOffsetCommitPeriodMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-) and [setMaxUncommittedOffsets](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-) methods. 
 
 * "offset.commit.period.ms" controls how often the spout commits to Kafka
 * "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place
@@ -284,7 +286,7 @@ The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumer
 
 * “fetch.min.bytes”
 * “fetch.max.wait.ms”
-* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [setPollTimeoutMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-) method.
 <br/>
 
 Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.
@@ -298,35 +300,6 @@ Currently the Kafka spout has has the following default values, which have been
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Processing Guarantees
-
-The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed to Kafka. This is
-conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as being successfully processed
-because the tuple won't get re-emitted in case of failure or time out.
-
-For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the commit happens.
-When the guarantee is NONE Kafka controls when the commit happens.
-
-* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
-     and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed more than once if for instance
-     the ack gets lost.
-
-* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being emitted to the downstream
-     components of the topology. Offsets are processed at most once because tuples that fail or timeout won't be retried.
-
-* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
-     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
-     it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
-     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
-
-To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
-
-```java
-KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
-  .builder(String bootstrapServers, String ... topics)
-  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
-```
-
 # Tuple Tracking
 
 By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index c52309e..84e7851 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,7 +39,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.Validate;
@@ -52,6 +51,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -89,7 +89,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutRetryService retryService;
     // Handles tuple events (emit, ack etc.)
     private transient KafkaTupleListener tupleListener;
-    // timer == null if processing guarantee is none or at-most-once
+    // timer == null only if the processing guarantee is at-most-once
     private transient Timer commitTimer;
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
@@ -133,8 +133,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (isAtLeastOnceProcessing()) {
-            // Only used if the spout should commit an offset to Kafka only after the corresponding tuple has been acked.
+        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+            // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
             commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
@@ -307,8 +307,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 kafkaSpoutConfig.getSubscription().refreshAssignment();
             }
 
-            if (shouldCommit()) {
-                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
+                if (isAtLeastOnceProcessing()) {
+                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
+                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                }
             }
 
             PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
@@ -332,10 +336,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
     }
 
-    private boolean shouldCommit() {
-        return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
-    }
-
     private PollablePartitionsInfo getPollablePartitionsInfo() {
         if (isWaitingToEmit()) {
             LOG.debug("Not polling. Tuples waiting to be emitted.");
@@ -519,6 +519,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
+    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions) {
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+        for (TopicPartition tp : assignedPartitions) {
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+        }
+        kafkaConsumer.commitAsync(offsetsToCommit, null);
+        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+    }
+    
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
         // Find offsets that are ready to be committed for every assigned topic partition
         final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream()
@@ -546,11 +555,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 long committedOffset = tpOffset.getValue().offset();
                 if (position < committedOffset) {
                     /*
-                     * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed,
-                     * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. 
-                     * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. 
-                     * Skip the consumer forward to the committed offset and drop the current waiting to emit list,
-                     * since it'll likely contain committed offsets.
+                     * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
+                     * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
+                     * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
+                     * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
                         position, committedOffset);
@@ -732,6 +740,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @VisibleForTesting
     KafkaOffsetMetric getKafkaOffsetMetric() {
-        return  kafkaOffsetMetric;
+        return kafkaOffsetMetric;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index a063790..c2305cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
 import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
@@ -96,7 +97,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        setAutoCommitMode(builder);
+        setKafkaPropsForProcessingGuarantee(builder);
         this.kafkaProps = builder.kafkaProps;
         this.subscription = builder.subscription;
         this.translator = builder.translator;
@@ -115,23 +116,26 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
-     * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to:
-     * <br/>
-     * <ul>
-     * <li>EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless
-     * of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>LATEST - the kafka spout polls records with offsets greater than the last offset in the partition,
-     * regardless of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
-     * committed it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
-     * committed it behaves as LATEST.</li>
-     * </ul>
+     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
      */
     public enum FirstPollOffsetStrategy {
+        /**
+         * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
+         * takes effect on topology deployment
+         */
         EARLIEST,
+        /**
+         * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
+         * setting only takes effect on topology deployment
+         */
         LATEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+         */
         UNCOMMITTED_EARLIEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+         */
         UNCOMMITTED_LATEST;
 
         @Override
@@ -142,28 +146,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
-     * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the spout controls when
-     * the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
-     *
-     * <ul>
-     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
-     * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
-     * the ack gets lost.</li>
-     * <br/>
-     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
-     * to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
-     * won't retry tuples that fail or timeout after the commit to Kafka has been done.</li>
-     * <br/>
-     * <li>NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
-     * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
-     * it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
-     * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.</li>
-     * </ul>
+     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
+     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
      */
+    @InterfaceStability.Unstable
     public enum ProcessingGuarantee {
+        /**
+         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
+         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
+         * interval.
+         */
         AT_LEAST_ONCE,
+        /**
+         * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
+         * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
+         * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
+         */
         AT_MOST_ONCE,
-        NONE,
+        /**
+         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
+         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
+         * spout to control when commits occur. Commits asynchronously on the defined interval.
+         */
+        NO_GUARANTEE,
     }
 
     public static class Builder<K, V> {
@@ -213,7 +219,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set a {@link KafkaConsumer} property.
+         * Set a {@link KafkaConsumer} property. 
          */
         public Builder<K, V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
@@ -221,7 +227,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set multiple {@link KafkaConsumer} properties.
+         * Set multiple {@link KafkaConsumer} properties. 
          */
         public Builder<K, V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
@@ -229,7 +235,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set multiple {@link KafkaConsumer} properties.
+         * Set multiple {@link KafkaConsumer} properties. 
          */
         public Builder<K, V> setProp(Properties props) {
             props.forEach((key, value) -> {
@@ -256,7 +262,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
          *
-         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+         * {@link ProcessingGuarantee#NO_GUARANTEE}.
          *
          * @param offsetCommitPeriodMs time in ms
          */
@@ -453,37 +460,37 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return builder;
     }
 
-    private static void setAutoCommitMode(Builder<?, ?> builder) {
+    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
         if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
+            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
         }
-        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        } else {
-            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
-                if (autoOffsetResetPolicy == null) {
-                    /*
-                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
-                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
-                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
-                    requests an offset that was deleted.
-                     */
-                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
-                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
-                        + " Some messages may be skipped.");
-                }
-            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
-                if (autoOffsetResetPolicy != null
-                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
-                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
-                        + " Some messages may be processed more than once.");
-                }
+        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+            if (autoOffsetResetPolicy == null) {
+                /*
+                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
+                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
+                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
+                 * requests an offset that was deleted.
+                 */
+                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+                    + " Some messages may be skipped.");
+            }
+        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+            if (autoOffsetResetPolicy != null
+                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+                    + " Some messages may be processed more than once.");
             }
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         }
+        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 9885cf6..17e0700 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -28,10 +28,15 @@ import java.util.HashMap;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class KafkaSpoutConfigTest {
 
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    
     @Test
     public void testBasic() {
         KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
@@ -85,4 +90,12 @@ public class KafkaSpoutConfigTest {
 
         assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
     }
+    
+    @Test
+    public void testThrowsIfEnableAutoCommitIsSet() {
+        expectedException.expect(IllegalStateException.class);
+        KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+            .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 353ee36..12391c8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -17,6 +17,7 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -37,6 +39,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.subscription.Subscription;
@@ -46,11 +49,18 @@ import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class KafkaSpoutMessagingGuaranteeTest {
 
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -109,10 +119,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception {
+    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
     }
@@ -153,10 +163,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeCannotReplayTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
+    public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
         doTestModeCannotReplayTuples(spoutConfig);
@@ -177,7 +187,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
             spout.ack(msgIdCaptor.getValue());
             
-            Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs());
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
             
             spout.nextTuple();
             
@@ -196,13 +206,37 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
     
     @Test
-    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer
+    public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+        //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+        
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+            
+            when(consumerMock.position(partition)).thenReturn(1L);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+            
+            spout.nextTuple();
+            
+            verify(consumerMock).commitAsync(commitCapture.capture(), isNull());
+            
+            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
+            assertThat(commit.containsKey(partition), is(true));
+            assertThat(commit.get(partition).offset(), is(1L));
+        }
     }
 
 }


[3/4] storm git commit: Merge branch 'STORM-2914' of https://github.com/srdo/storm into STORM-2914-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2914' of https://github.com/srdo/storm into STORM-2914-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffa5f05d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffa5f05d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffa5f05d

Branch: refs/heads/master
Commit: ffa5f05df151e01c662fa8d74f797138650099d7
Parents: 7fbe7a2 eff32a3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Feb 5 10:56:56 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Feb 5 10:56:56 2018 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  47 ++-----
 .../apache/storm/kafka/spout/KafkaSpout.java    |  40 +++---
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 131 ++++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  13 ++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  54 ++++++--
 5 files changed, 160 insertions(+), 125 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: STORM-2913: Add metadata to at-most-once and at-least-once commits

Posted by ka...@apache.org.
STORM-2913: Add metadata to at-most-once and at-least-once commits


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e756889a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e756889a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e756889a

Branch: refs/heads/master
Commit: e756889aa712aee22c216bc99ee17b972abf886d
Parents: eff32a3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jan 27 15:15:45 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Feb 4 23:18:54 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 80 ++++++-----------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  4 +-
 .../spout/internal/CommitMetadataManager.java   | 91 ++++++++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++----
 4 files changed, 146 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 84e7851..9d133a7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,8 +23,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -72,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
 
     // Storm
     protected SpoutOutputCollector collector;
@@ -104,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
-    // Metadata information to commit to Kafka. It is unique per spout per topology.
-    private transient String commitMetadata;
+    private transient CommitMetadataManager commitMetadataManager;
     private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -142,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = new HashMap<>();
-        setCommitMetadata(context);
+        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
@@ -154,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void registerMetric() {
         LOG.info("Registering Spout Metrics");
-        kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
+        kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
         context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
     }
 
@@ -168,16 +165,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return true;
     }
 
-    private void setCommitMetadata(TopologyContext context) {
-        try {
-            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
-                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
-        } catch (JsonProcessingException e) {
-            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
-            throw new RuntimeException(e);
-        }
-    }
-
     private boolean isAtLeastOnceProcessing() {
         return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
     }
@@ -215,8 +202,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 retryService.retainAll(partitions);
 
                 /*
-                 * Emitted messages for partitions that are no longer assigned to this spout can't
-                 * be acked and should not be retried, hence remove them from emitted collection.
+                 * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence
+                 * remove them from emitted collection.
                  */
                 emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
             }
@@ -246,7 +233,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (committedOffset != null) {
                 // offset was previously committed for this consumer group and topic-partition, either by this or another topology.
-                if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+                if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
                     kafkaConsumer.seek(newTp, committedOffset.offset());
                 } else {
@@ -274,31 +261,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    /**
-     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if
-     * {@link FirstPollOffsetStrategy} should be applied
-     *
-     * @param tp topic-partition
-     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
-     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
-     */
-    private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) {
-        try {
-            if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) {
-                return true;
-            }
-
-            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
-            return committedMetadata.getTopologyId().equals(context.getStormId());
-        } catch (IOException e) {
-            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
-                + "for this topic-partition was done using an earlier version of Storm. "
-                + "Defaulting to behavior compatible with earlier version", committedOffset);
-            LOG.trace("", e);
-            return false;
-        }
-    }
-
     // ======== Next Tuple =======
     @Override
     public void nextTuple() {
@@ -311,7 +273,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (isAtLeastOnceProcessing()) {
                     commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
-                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                 }
             }
 
@@ -396,7 +361,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is at-most-once.
-                kafkaConsumer.commitSync();
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                kafkaConsumer.commitSync(offsetsToCommit);
+                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
             return consumerRecords;
         } finally {
@@ -469,11 +437,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
         } else {
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
-            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
-                && committedOffset.offset() > record.offset()) {
+            if (isAtLeastOnceProcessing()
+                && committedOffset != null 
+                && committedOffset.offset() > record.offset()
+                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                 // Ensures that after a topology with this id is started, the consumer fetch
                 // position never falls behind the committed offset (STORM-2844)
-                throw new IllegalStateException("Attempting to emit a message that has already been committed.");
+                throw new IllegalStateException("Attempting to emit a message that has already been committed."
+                    + " This should never occur when using the at-least-once processing guarantee.");
             }
 
             final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
@@ -519,13 +490,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
-    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions) {
+    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
         Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
         for (TopicPartition tp : assignedPartitions) {
-            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
         }
-        kafkaConsumer.commitAsync(offsetsToCommit, null);
-        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+        return offsetsToCommit;
     }
     
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
@@ -536,7 +506,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
         for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
-            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
+            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index c2305cb..40e449a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -474,7 +474,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                  * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
                  * requests an offset that was deleted.
                  */
-                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
                     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
                 builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
@@ -488,7 +488,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                     + " Some messages may be processed more than once.");
             }
         }
-        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit",
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+            return committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the last commit to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 12391c8..a9e7c6c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,11 +21,12 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -84,12 +86,19 @@ public class KafkaSpoutMessagingGuaranteeTest {
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
 
         spout.nextTuple();
+        
+        when(consumerMock.position(partition)).thenReturn(1L);
 
         //The spout should have emitted the tuple, and must have committed it before emit
         InOrder inOrder = inOrder(consumerMock, collectorMock);
         inOrder.verify(consumerMock).poll(anyLong());
-        inOrder.verify(consumerMock).commitSync();
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+        
+        CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
     }
 
     private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
@@ -172,7 +181,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
         doTestModeCannotReplayTuples(spoutConfig);
     }
 
-    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
         try (SimulatedTime time = new SimulatedTime()) {
             KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
@@ -180,6 +195,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
 
             spout.nextTuple();
+            clearInvocations(consumerMock);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
@@ -189,21 +205,15 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
             
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+            
             spout.nextTuple();
             
-            verify(consumerMock, never()).commitSync(any());
+            verify(consumerMock, never()).commitSync(argThat(arg -> {
+                return !arg.containsKey(partition);
+            }));
         }
     }
-
-    @Test
-    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setTupleTrackingEnforced(true)
-            .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
-    }
     
     @Test
     public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
@@ -233,9 +243,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             verify(consumerMock).commitAsync(commitCapture.capture(), isNull());
             
-            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
-            assertThat(commit.containsKey(partition), is(true));
-            assertThat(commit.get(partition).offset(), is(1L));
+            CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
         }
     }
 


[4/4] storm git commit: Merge branch 'STORM-2913' of https://github.com/srdo/storm into STORM-2913-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2913' of https://github.com/srdo/storm into STORM-2913-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5d70e17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5d70e17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5d70e17

Branch: refs/heads/master
Commit: b5d70e17d798f91417b74863772f0826b4a84625
Parents: ffa5f05 e756889
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Feb 5 10:57:33 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Feb 5 10:57:33 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 80 ++++++-----------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  4 +-
 .../spout/internal/CommitMetadataManager.java   | 91 ++++++++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++----
 4 files changed, 146 insertions(+), 74 deletions(-)
----------------------------------------------------------------------