You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by liurenjie1024 <gi...@git.apache.org> on 2016/12/14 06:55:37 UTC

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

GitHub user liurenjie1024 opened a pull request:

    https://github.com/apache/storm/pull/1825

    STORM-2236 storm kafka client support manual partition management.

    1. Support manual partition assignment, since kafka managed partition assignment may cause unnecessary rebalance if you don't keep polling.
    2. A little change to fix STOMR-2077 which may lose failed messages.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MediaV/storm mvad-1.x

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1825.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1825
    
----
commit e04e5002cc434dbb3f70e0b6d69725705f008a14
Author: liurenjie1024 <li...@gmail.com>
Date:   2016-12-14T06:37:41Z

    Assign partitions to consumers manually

commit ca091c566f877b1e0c507ea23368820c5930252a
Author: liurenjie1024 <li...@gmail.com>
Date:   2016-12-14T06:45:19Z

    Merge branch '1.x-branch' of github.com:apache/storm into mvad-1.x

commit 3bb2be09b984b4c7727a8cd47f2479c4a49dcd0d
Author: liurenjie1024 <li...@gmail.com>
Date:   2016-12-14T06:47:55Z

    Fix STORM-2077

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93110414
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    +        if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
    +        doRefreshPartitions();
    +    }
    +
    +    private void doRefreshPartitions() {
    +        KafkaSpoutStreamsNamedTopics streams = KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams);
    --- End diff --
    
    Doing "(KafkaSpoutStreamsNamedTopics) streams" throws a ClassCastException if streams isn't assignable to the type being casted to as far as I know, not an NPE? Doesn't really matter, I was just curious :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008701
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -262,11 +312,12 @@ private void doSeekRetriableTopicPartitions() {
             final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
     
             for (TopicPartition rtp : retriableTopicPartitions) {
    -            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
    -            if (offsetAndMeta != null) {
    -                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
    -            } else {
    -                kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset
    +            KafkaSpout.OffsetEntry entry = acked.get(rtp);
    +            if (entry != null) {
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92844143
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -59,18 +60,24 @@
     
         // Storm
         protected SpoutOutputCollector collector;
    +    protected int thisTaskIndex;
    --- End diff --
    
    These could be private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92851175
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java ---
    @@ -0,0 +1,21 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.Comparator;
    +
    +/**
    + * Created by liurenjie on 12/7/16.
    + */
    +public enum TopicPartitionComparator implements Comparator<TopicPartition> {
    --- End diff --
    
    Nit: I don't think there's anything wrong with making this an enum, but it seems conceptually weird to me. Why not just make this a regular class, and then make the field for it in the spout static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    @liurenjie1024 putting aside for a second the discussion about the need to implement partition assignment, if we indeed agree that we should support it, I think that the `KafkaSpout` class is getting a bit too bloated to handle some of these separate requirements. In a sense this is a new feature on its own. I would favor using a more OOD approach if possible. Perhaps subclass or compose the `KafkaSpout` @srdo @revans2 what's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92847519
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -262,11 +312,12 @@ private void doSeekRetriableTopicPartitions() {
             final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
     
             for (TopicPartition rtp : retriableTopicPartitions) {
    -            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
    -            if (offsetAndMeta != null) {
    -                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
    -            } else {
    -                kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset
    +            KafkaSpout.OffsetEntry entry = acked.get(rtp);
    +            if (entry != null) {
    --- End diff --
    
    I'd like it if you could revert the commit for fixing STORM-2077, since a different fix is being ported back from master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    Maybe we do not need to maintain two versions of `KafkaSpout` with same functionality. The main goal of automatic partition management is to ease fault tolerant in simple applications, not in frameworks like storm. Automatic partition management may hide extra overhead and unpredictable behavior which is unnecessary in frameworks like storm. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92845884
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    +        if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
    +        doRefreshPartitions();
    +    }
    +
    +    private void doRefreshPartitions() {
    +        KafkaSpoutStreamsNamedTopics streams = KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams);
    --- End diff --
    
    I think you can still support wildcard topics via KafkaConsumer.listTopics. If it isn't added in this PR, we should make an issue for it at least.
    
    Also nit: Why are you using class.cast instead of C-style cast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008815
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    +         * @param manualPartitionAssign Whether use manual partition assignment.
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    Also I was wrong about spout instances interfering during reassignment. Since the task count is constant for the lifetime of the topology, there's no issue since a partition will always be assigned to the same task. My bad :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008846
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    +         * @param manualPartitionAssign Whether use manual partition assignment.
    +         */
    +        public Builder setManualPartitionAssign(boolean manualPartitionAssign) {
    +            this.manualPartitionAssign = manualPartitionAssign;
    +            return this;
    +        }
    +
    +        /**
    +         * Defines partition refresh period in the manual partition assign model.
    +         * @param partitionRefreshPeriodMs Partition refresh period in ms.
    +         */
    +        public Builder setPartitionRefreshPeriodMs(int partitionRefreshPeriodMs) {
    +            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
    +            return this;
    +        }
    +
    +        /**
    +         * Validate configs before build.
    +         */
    +        private void validate() {
    +            if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    +                throw new IllegalArgumentException("Manual partition assign can't be used with wildcard topics!");
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008860
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java ---
    @@ -0,0 +1,21 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.Comparator;
    +
    +/**
    + * Created by liurenjie on 12/7/16.
    + */
    +public enum TopicPartitionComparator implements Comparator<TopicPartition> {
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008412
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92850866
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java ---
    @@ -0,0 +1,21 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.Comparator;
    +
    +/**
    + * Created by liurenjie on 12/7/16.
    --- End diff --
    
    Please remove the created by comment from the files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92850572
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    +         * @param manualPartitionAssign Whether use manual partition assignment.
    +         */
    +        public Builder setManualPartitionAssign(boolean manualPartitionAssign) {
    +            this.manualPartitionAssign = manualPartitionAssign;
    +            return this;
    +        }
    +
    +        /**
    +         * Defines partition refresh period in the manual partition assign model.
    +         * @param partitionRefreshPeriodMs Partition refresh period in ms.
    +         */
    +        public Builder setPartitionRefreshPeriodMs(int partitionRefreshPeriodMs) {
    +            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
    +            return this;
    +        }
    +
    +        /**
    +         * Validate configs before build.
    +         */
    +        private void validate() {
    +            if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    +                throw new IllegalArgumentException("Manual partition assign can't be used with wildcard topics!");
    --- End diff --
    
    assign -> assignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92850399
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    +         * @param manualPartitionAssign Whether use manual partition assignment.
    +         */
    +        public Builder setManualPartitionAssign(boolean manualPartitionAssign) {
    +            this.manualPartitionAssign = manualPartitionAssign;
    +            return this;
    +        }
    +
    +        /**
    +         * Defines partition refresh period in the manual partition assign model.
    +         * @param partitionRefreshPeriodMs Partition refresh period in ms.
    +         */
    +        public Builder setPartitionRefreshPeriodMs(int partitionRefreshPeriodMs) {
    +            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
    +            return this;
    +        }
    +
    +        /**
    +         * Validate configs before build.
    +         */
    +        private void validate() {
    +            if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    --- End diff --
    
    Probably better to check that kafkaSpoutStreams is an instance of KafkaSpoutStreamsNamedTopics, so this doesn't break if more subclasses are added at some point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 closed the pull request at:

    https://github.com/apache/storm/pull/1825


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93002685
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -59,18 +60,24 @@
     
         // Storm
         protected SpoutOutputCollector collector;
    +    protected int thisTaskIndex;
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92850100
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    +         * @param manualPartitionAssign Whether use manual partition assignment.
    --- End diff --
    
    Nit: "True if using manual partition assignment" is clearer IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008354
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -200,16 +215,22 @@ private void setAcked(TopicPartition tp, long fetchOffset) {
         @Override
         public void nextTuple() {
             if (initialized) {
    -            if (commit()) {
    -                commitOffsetsForAckedTuples();
    -            }
    +            try {
    +                refreshPartitionIfNeeded();
     
    -            if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    -            }
    +                if (commit()) {
    +                    commitOffsetsForAckedTuples();
    +                }
     
    -            if (waitingToEmit()) {
    -                emit();
    +                if (poll()) {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                }
    +
    +                if (waitingToEmit()) {
    +                    emit();
    +                }
    +            } catch (Exception e) {
    --- End diff --
    
    It's not uncommon to encounter an exception when kafka consumer poll data from broker and I don't think we should restart it. But I should not catch the whole function. This has been resolved in the new patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92845477
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    +        if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
    --- End diff --
    
    Please add braces to this if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    Btw @liurenjie1024 you need to make a PR for this against master first. We can't as far as I know merge features to 1.x before they are also on master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92845198
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -59,18 +60,24 @@
     
         // Storm
         protected SpoutOutputCollector collector;
    +    protected int thisTaskIndex;
    +    protected int taskCount;
     
         // Kafka
         private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
         private transient KafkaConsumer<K, V> kafkaConsumer;
         private transient boolean consumerAutoCommitMode;
     
     
    +
         // Bookkeeping
         private transient int maxRetries;                                   // Max number of times a tuple is retried
         private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
         private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
         private transient Timer commitTimer;                                // timer == null for auto commit mode
    +    private transient Timer partitionRefreshTimer;                      // partitionRefreshTime != null if in manual partition assign model
    +    private transient boolean manualPartitionAssignment;
    --- End diff --
    
    Nit: Rename to manualPartitionAssignmentEnabled here and in KafkaSpoutConfig


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92849984
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    --- End diff --
    
    Consider changing this to "If set to true, the spout instances are assigned partitions round-robin", since readers maybe don't know what it means to behave like a simple consumer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    Hi. STORM-2077 is being resolved here https://issues.apache.org/jira/browse/STORM-2087. Sorry for the inconvenience.
    
    I'm not sure I understand why adding manual partition management is necessary. Kafka has recently added heartbeating from a background thread, so it's no longer necessary to poll often in order to avoid rebalances (see http://kafka.apache.org/documentation.html#upgrade_1010_notable, search for "heartbeat"). That change decouples necessary poll frequency from the time it would take to detect a crashed consumer, so the `max.poll.interval.ms` can be bumped if a rebalance every 5 minutes is unacceptable (and then only if poll isn't called in that interval).
    
    Could you describe a use case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008736
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -230,8 +237,37 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
             }
     
             public KafkaSpoutConfig<K,V> build() {
    +            validate();
                 return new KafkaSpoutConfig<>(this);
             }
    +
    +        /**
    +         * Defines whether the consumer manages partition manually.
    +         * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008418
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    +        if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
    --- End diff --
    
    Fixed in the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r93008672
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    +        if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
    +        doRefreshPartitions();
    +    }
    +
    +    private void doRefreshPartitions() {
    +        KafkaSpoutStreamsNamedTopics streams = KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams);
    --- End diff --
    
    1. Wildcard topics support are added.
    2. class.cast is preferred because it throws an exception which better explains the failure rather than NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92848870
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -360,18 +411,22 @@ private void subscribeKafkaConsumer() {
             kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
                     kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
     
    -        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
    -            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
    -            kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener());
    -            LOG.info("Kafka consumer subscribed topics {}", topics);
    -        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    -            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
    -            kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
    -            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
    +        if (manualPartitionAssignment) {
    +            doRefreshPartitions();
    --- End diff --
    
    Nit, but it would probably be good to log which topic is subscribed as well here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    @liurenjie1024 Okay, it makes sense that having Kafka manage partitions isn't really necessary since Storm will make sure to keep spout instances running. I can't speak to how much of an overhead and unpredictability there really is to making Kafka manage it though, but it's fine by me if we want to switch to manual management. I can't think of a good reason for supporting both manual and automatic assignment though. 
    
    The reason the old Kafka spout did manual assignment was that automatic assignment wasn't available on the old APIs as far as I know.
    
    I agree that we shouldn't have two variants of the spout, it just adds unnecessary complexity. I'd rather we just do a solid manual assignment implementation and get rid of the automatic code, if we need to switch.
    
    Keep in mind that to be on par with automatic assignment manual assignment needs to support adding partitions without requiring a spout restart, and without the spout instances stepping on each others' toes (e.g. avoid partitions being assigned to two spout instances at once, even while reassigning).
    
    @hmcl Sure, it's more readable if strategies like this can be sectioned off in their own classes. If we really have to support both, I think it's nicer if assignment can be implemented through delegates rather than through subclassing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    @srdo I'll submit a patch for that later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    I think the behavior of consumer's automatic partition management is complicated and may lead to unpredictable behavior, which is quite important for many purposes like debugging, performance tuning, etc, especially in critical systems like billing system in our case. In fact, even the author of kafka consumer suggests us to use manual partition management when we use streaming process frameworks (please refer to the Manual Partition Assignment part of  [kafka consumer doc (http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html). Other streaming process frameworks like flink, apex, spark streaming, old storm kafka client also uses manual partition management.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92845054
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -200,16 +215,22 @@ private void setAcked(TopicPartition tp, long fetchOffset) {
         @Override
         public void nextTuple() {
             if (initialized) {
    -            if (commit()) {
    -                commitOffsetsForAckedTuples();
    -            }
    +            try {
    +                refreshPartitionIfNeeded();
     
    -            if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    -            }
    +                if (commit()) {
    +                    commitOffsetsForAckedTuples();
    +                }
     
    -            if (waitingToEmit()) {
    -                emit();
    +                if (poll()) {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                }
    +
    +                if (waitingToEmit()) {
    +                    emit();
    +                }
    +            } catch (Exception e) {
    --- End diff --
    
    Why are you catching this? If the spout encounters some unexpected exception, we want it to crash I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1825: STORM-2236 storm kafka client support manual partition ma...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1825
  
    Hi, @srdo I've submitted a new patch for this issue with some refactor against master branch. I think most of your comments are great and have fixed them in the new patch. Please help to review [this PR](https://github.com/apache/storm/pull/1835) and this PR will be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1825: STORM-2236 storm kafka client support manual parti...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1825#discussion_r92844773
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -236,6 +257,35 @@ private boolean poll() {
             return poll;
         }
     
    +    private void refreshPartitionIfNeeded() {
    --- End diff --
    
    Nit: Rename to something more precise, like refreshAssignedPartitionsIfNeeded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---