You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/29 19:05:13 UTC

[1/2] storm git commit: Merge branch 'Apache_master_STORM-2052_KSPI' of https://github.com/hmcl/storm-apache into STORM-2052

Repository: storm
Updated Branches:
  refs/heads/1.x-branch f9d47afeb -> 9ae6e9873


Merge branch 'Apache_master_STORM-2052_KSPI' of https://github.com/hmcl/storm-apache into STORM-2052


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c231b4c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c231b4c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c231b4c

Branch: refs/heads/1.x-branch
Commit: 4c231b4c3f0ef46f1fd4fdbd9ae7be80e94a1d77
Parents: f9d47af
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Aug 29 11:52:12 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Aug 29 11:57:11 2016 -0700

----------------------------------------------------------------------
 external/storm-kafka-client/README.md           | 28 +++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 50 ++++++++++++++------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  6 +--
 3 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4c231b4c/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
index 515f9ac..5784b8a 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -157,6 +157,34 @@ When selecting a kafka client version, you should ensure -
  0.8.x broker.
 
 
+#Kafka Spout Performance Tuning
+
+The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 
+
+* "offset.commit.period.ms" controls how often the spout commits to Kafka
+* "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place
+<br/>
+
+The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumerconfigs) parameters may also have an impact on the performance of the spout. The following Kafka parameters are likely the most influential in the spout performance: 
+
+* \u201cfetch.min.bytes\u201d
+* \u201cfetch.max.wait.ms\u201d
+* [Kafka Consumer] (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+<br/>
+
+Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.
+
+###Default values
+
+Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+
+* poll.timeout.ms = 200
+* offset.commit.period.ms = 30000   (30s)
+* max.uncommitted.offsets = 10000000
+<br/>
+
+There will be a blog post coming soon analyzing the trade-offs of this tuning parameters, and comparing the performance of the Kafka Spouts using the Kafka client API introduced in 0.9 (new implementation) and in prior versions (prior implementation)
+
 #Future Work
  Implement comprehensive metrics. Trident spout is coming soon.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4c231b4c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e9b37b3..4389acb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -123,7 +123,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
             if (!consumerAutoCommitMode && initialized) {
                 initialized = false;
@@ -133,7 +133,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             initialize(partitions);
@@ -152,7 +152,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 setAcked(tp, fetchOffset);
             }
             initialized = true;
-            LOG.debug("Initialization complete");
+            LOG.info("Initialization complete");
         }
 
         /**
@@ -221,7 +221,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean poll() {
-        return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
+        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
+        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
+
+        if (!poll) {
+            if (waitingToEmit()) {
+                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
+            }
+
+            if (numUncommittedOffsets >= maxUncommittedOffsets) {
+                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
+            }
+        }
+        return poll;
     }
 
     private boolean waitingToEmit() {
@@ -234,7 +246,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             waitingToEmitList.addAll(consumerRecords.records(tp));
         }
         waitingToEmit = waitingToEmitList.iterator();
-        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
     }
 
     // ======== poll =========
@@ -243,7 +254,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
-        LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
     }
 
@@ -319,7 +330,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
             acked.get(msgId.getTopicPartition()).add(msgId);
-            LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
         }
         emitted.remove(msgId);
     }
@@ -393,7 +403,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public String toString() {
-        return "{acked=" + acked + "} ";
+        return "KafkaSpout{" +
+                "acked=" + acked +
+                ", emitted=" + emitted +
+                "}";
     }
 
     @Override
@@ -470,12 +483,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
-                    LOG.trace("Found offset to commit [{}]. {}", currOffset, this);
                 } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
-                    LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this);
+                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
                     break;
                 } else {
-                    LOG.debug("Unexpected offset found [{}]. {}", currOffset, this);
+                    LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
                     break;
                 }
             }
@@ -483,10 +495,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             OffsetAndMetadata nextCommitOffsetAndMetadata = null;
             if (found) {
                 nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
-                LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this);
+                LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
             } else {
-                LOG.debug("No offsets ready to commit. {}", this);
+                LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
             }
+            LOG.trace("{}", this);
             return nextCommitOffsetAndMetadata;
         }
 
@@ -497,8 +510,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
          * @param committedOffset offset to be marked as committed
          */
         public void commit(OffsetAndMetadata committedOffset) {
+            long numCommittedOffsets = 0;
             if (committedOffset != null) {
-                final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+                final long oldCommittedOffset = this.committedOffset;
+                numCommittedOffsets = committedOffset.offset() - this.committedOffset;
                 this.committedOffset = committedOffset.offset();
                 for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
                     if (iterator.next().offset() <= committedOffset.offset()) {
@@ -508,8 +523,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     }
                 }
                 numUncommittedOffsets-= numCommittedOffsets;
+                LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
+                        oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
+            } else {
+                LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
+                        numCommittedOffsets, tp, numUncommittedOffsets);
             }
-            LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets);
+            LOG.trace("{}", this);
         }
 
         public boolean isEmpty() {

http://git-wip-us.apache.org/repos/asf/storm/blob/4c231b4c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 1beec0e..7c97ac9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -32,10 +32,10 @@ import java.util.regex.Pattern;
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            // 2s
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   // 15s
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
     public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    // 10,000 records
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
 
     // Kafka property names
     public interface Consumer {


[2/2] storm git commit: Added STORM-2052 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2052 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9ae6e987
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9ae6e987
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9ae6e987

Branch: refs/heads/1.x-branch
Commit: 9ae6e987314b85b632909daceecbfb51ab382bfe
Parents: 4c231b4
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Aug 29 11:57:40 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Aug 29 11:57:40 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9ae6e987/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 91c327f..ffa83dc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter Tuning for Better Performance.
  * STORM-2050: [storm-sql] Support User Defined Aggregate Function for Trident mode
  * STORM-1434: Support the GROUP BY clause in StormSQL
  * STORM-2016: Topology submission improvement: support adding local jars and maven artifacts on submission