You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/29 18:56:35 UTC

[1/3] storm git commit: STORM-2052: Kafka Spout - New Client API - Log Improvements and Parameter Tuning for Better Performance - Set default config values for good performance - Improve Logging - Log heavy objects with TRACE level - Make log

Repository: storm
Updated Branches:
  refs/heads/master 7e15fbe34 -> 3e56bd10d


STORM-2052: Kafka Spout - New Client API - Log Improvements and Parameter Tuning for Better Performance
  - Set default config values for good performance
  - Improve Logging
    - Log heavy objects with TRACE level
    - Make log messages more meaningful
   - Delete a couple of logs
  - Update README


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9dd62dac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9dd62dac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9dd62dac

Branch: refs/heads/master
Commit: 9dd62dacf9aab8e5db3bb2647007956eb1425b03
Parents: 0b080fc
Author: Hugo Louro <hm...@gmail.com>
Authored: Tue Aug 2 22:24:52 2016 -0700
Committer: Hugo Louro <hm...@gmail.com>
Committed: Thu Aug 25 11:35:38 2016 -0700

----------------------------------------------------------------------
 external/storm-kafka-client/README.md           | 28 +++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 50 ++++++++++++++------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  6 +--
 3 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9dd62dac/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
index 15d5f94..50fe871 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -156,6 +156,34 @@ When selecting a kafka client version, you should ensure -
  2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
  0.8.x broker. 
 
+#Kafka Spout Performance Tuning
+
+The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 
+
+* "offset.commit.period.ms" controls how often the spout commits to Kafka
+* "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place
+<br/>
+
+The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumerconfigs) parameters may also have an impact on the performance of the spout. The following Kafka parameters are likely the most influential in the spout performance: 
+
+* \u201cfetch.min.bytes\u201d
+* \u201cfetch.max.wait.ms\u201d
+* [Kafka Consumer] (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+<br/>
+
+Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.
+
+###Default values
+
+Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+
+* poll.timeout.ms = 200
+* offset.commit.period.ms = 30000   (30s)
+* max.uncommitted.offsets = 10000000
+<br/>
+
+There will be a blog post coming soon analyzing the trade-offs of this tuning parameters, and comparing the performance of the Kafka Spouts using the Kafka client API introduced in 0.9 (new implementation) and in prior versions (prior implementation)
+
 #Future Work
  Implement comprehensive metrics. Trident spout is coming soon.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9dd62dac/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e9b37b3..4389acb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -123,7 +123,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
             if (!consumerAutoCommitMode && initialized) {
                 initialized = false;
@@ -133,7 +133,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             initialize(partitions);
@@ -152,7 +152,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 setAcked(tp, fetchOffset);
             }
             initialized = true;
-            LOG.debug("Initialization complete");
+            LOG.info("Initialization complete");
         }
 
         /**
@@ -221,7 +221,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean poll() {
-        return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
+        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
+        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
+
+        if (!poll) {
+            if (waitingToEmit()) {
+                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
+            }
+
+            if (numUncommittedOffsets >= maxUncommittedOffsets) {
+                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
+            }
+        }
+        return poll;
     }
 
     private boolean waitingToEmit() {
@@ -234,7 +246,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             waitingToEmitList.addAll(consumerRecords.records(tp));
         }
         waitingToEmit = waitingToEmitList.iterator();
-        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
     }
 
     // ======== poll =========
@@ -243,7 +254,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
-        LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
     }
 
@@ -319,7 +330,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
             acked.get(msgId.getTopicPartition()).add(msgId);
-            LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
         }
         emitted.remove(msgId);
     }
@@ -393,7 +403,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public String toString() {
-        return "{acked=" + acked + "} ";
+        return "KafkaSpout{" +
+                "acked=" + acked +
+                ", emitted=" + emitted +
+                "}";
     }
 
     @Override
@@ -470,12 +483,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
-                    LOG.trace("Found offset to commit [{}]. {}", currOffset, this);
                 } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
-                    LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this);
+                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
                     break;
                 } else {
-                    LOG.debug("Unexpected offset found [{}]. {}", currOffset, this);
+                    LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
                     break;
                 }
             }
@@ -483,10 +495,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             OffsetAndMetadata nextCommitOffsetAndMetadata = null;
             if (found) {
                 nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
-                LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this);
+                LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
             } else {
-                LOG.debug("No offsets ready to commit. {}", this);
+                LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
             }
+            LOG.trace("{}", this);
             return nextCommitOffsetAndMetadata;
         }
 
@@ -497,8 +510,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
          * @param committedOffset offset to be marked as committed
          */
         public void commit(OffsetAndMetadata committedOffset) {
+            long numCommittedOffsets = 0;
             if (committedOffset != null) {
-                final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+                final long oldCommittedOffset = this.committedOffset;
+                numCommittedOffsets = committedOffset.offset() - this.committedOffset;
                 this.committedOffset = committedOffset.offset();
                 for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
                     if (iterator.next().offset() <= committedOffset.offset()) {
@@ -508,8 +523,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     }
                 }
                 numUncommittedOffsets-= numCommittedOffsets;
+                LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
+                        oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
+            } else {
+                LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
+                        numCommittedOffsets, tp, numUncommittedOffsets);
             }
-            LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets);
+            LOG.trace("{}", this);
         }
 
         public boolean isEmpty() {

http://git-wip-us.apache.org/repos/asf/storm/blob/9dd62dac/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 1beec0e..7c97ac9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -32,10 +32,10 @@ import java.util.regex.Pattern;
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            // 2s
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   // 15s
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
     public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    // 10,000 records
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
 
     // Kafka property names
     public interface Consumer {


[3/3] storm git commit: Added STORM-2052 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2052 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e56bd10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e56bd10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e56bd10

Branch: refs/heads/master
Commit: 3e56bd10df812d9c27a39ed94f3d22a3ecfffd52
Parents: 5e36f93
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Aug 29 11:55:35 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Aug 29 11:55:35 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3e56bd10/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c78101c..8b63a6e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter Tuning for Better Performance
  * STORM-2045: fixed SpoutExecutor NPE
  * STORM-2041: Make Java 8 as minimum requirement for 2.0 release
  * STORM-1256: port backtype.storm.utils.ZookeeperServerCnxnFactory-test to java


[2/3] storm git commit: Merge branch 'Apache_master_STORM-2052_KSPI' of https://github.com/hmcl/storm-apache into STORM-2052

Posted by sr...@apache.org.
Merge branch 'Apache_master_STORM-2052_KSPI' of https://github.com/hmcl/storm-apache into STORM-2052


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5e36f934
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5e36f934
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5e36f934

Branch: refs/heads/master
Commit: 5e36f9342d2322ea987a2d386e11724de4b514dd
Parents: 7e15fbe 9dd62da
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Aug 29 11:52:12 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Aug 29 11:52:12 2016 -0700

----------------------------------------------------------------------
 external/storm-kafka-client/README.md           | 28 +++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 50 ++++++++++++++------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  6 +--
 3 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------