You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by liurenjie1024 <gi...@git.apache.org> on 2016/12/19 10:16:28 UTC

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

GitHub user liurenjie1024 opened a pull request:

    https://github.com/apache/storm/pull/1835

    STORM-2236: Kafka Spout with manual partition management.

    As the title says.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MediaV/storm STORM-2236

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1835.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1835
    
----
commit e4aa489b6ff6a26db7492e14270b854fca7d7c98
Author: liurenjie1024 <li...@gmail.com>
Date:   2016-12-19T10:10:30Z

    Manual partition management.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164660
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -75,6 +78,7 @@
         private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
         private transient Timer commitTimer;                                // timer == null for auto commit mode
         private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
    +    private transient KafkaRecordsFetcher<K, V> recordsFetcher;         // Class that encapsulate the log of partition management
    --- End diff --
    
    Sorry, that's a typo. I've fixed in the new patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93108651
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,88 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    +
    +public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V> {
    +    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator();
    +
    +    private final KafkaConsumer<K, V> consumer;
    +    private final int thisTaskIndex;
    +    private final int totalTaskCount;
    +    private final KafkaPartitionReader partitionReader;
    +    private final KafkaSpout.Timer partitionRefreshTimer;
    +    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
    +    private Set<TopicPartition> myPartitions = Collections.emptySet();
    +
    +    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
    +                                     int thisTaskIndex,
    +                                     int totalTaskCount,
    +                                     KafkaPartitionReader partitionReader,
    +                                     KafkaSpout.Timer partitionRefreshTimer,
    +                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener) {
    +        this.consumer = consumer;
    +        this.thisTaskIndex = thisTaskIndex;
    +        this.totalTaskCount = totalTaskCount;
    +        this.partitionReader = partitionReader;
    +        this.partitionRefreshTimer = partitionRefreshTimer;
    +        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void refreshMyPartitionsIfNeed() {
    +        if (!partitionRefreshTimer.isExpiredResetOnTrue()) {
    +            return;
    +        }
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void doRefreshMyPartitions() {
    +        List<TopicPartition> topicPartitions = partitionReader.readPartitions(consumer);
    +        Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR);
    +
    +        Set<TopicPartition> curPartitions = new HashSet<>(topicPartitions.size()/totalTaskCount+1);
    +        for (int i=thisTaskIndex; i<topicPartitions.size(); i+=totalTaskCount) {
    +            curPartitions.add(topicPartitions.get(i));
    +        }
    +
    +        if (!myPartitions.equals(curPartitions) && myPartitions!=null) {
    +            partitionAssignmentChangeListener.onPartitionAssignmentChange(myPartitions, curPartitions);
    +        }
    +
    +        myPartitions = curPartitions;
    +
    +        consumer.assign(myPartitions);
    +    }
    +
    +    @Override
    +    public ConsumerRecords<K, V> fetchRecords(long fetchTimeoutMs) {
    +        refreshMyPartitionsIfNeed();
    +
    +        return consumer.poll(fetchTimeoutMs);
    +    }
    +
    +    public interface PartitionAssignmentChangeListener {
    +        void onPartitionAssignmentChange(Set<TopicPartition> oldPartitions, Set<TopicPartition> newPartitions);
    +    }
    +
    +    public static PartitionAssignmentChangeListener listenerOf(final ConsumerRebalanceListener consumerRebalanceListener) {
    +        return new PartitionAssignmentChangeListener() {
    --- End diff --
    
    You don't have to change it, but on master we're targeting Java 8, so lambdas are an option for this kind of thing :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164910
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaPartitionReader.java ---
    @@ -0,0 +1,10 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.List;
    +
    +public interface KafkaPartitionReader {
    --- End diff --
    
    Moved to another package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164850
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -218,7 +224,11 @@ public void nextTuple() {
                 }
     
                 if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    +                try {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                } catch (Exception e) {
    +                    LOG.error("Failed to poll from kafka.", e);
    --- End diff --
    
    We have encountered many exceptions: KafkaFetchException, LeaderNoFoundException, etc. These are caused by temporarily kafka partition migration. I don't think we should restart the process in these cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93551580
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -218,7 +223,11 @@ public void nextTuple() {
                 }
     
                 if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    +                try {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                } catch (KafkaException e) {
    --- End diff --
    
    @srdo good point, but the main point is also that the code that is handling the interrupt exceptions is very fragile. Right now you were able to catch this, but it's very difficult for other devs to code accounting for that.
    
    @liurenjie1024 we are basically catching and logging the exception. Is that really the intent? Is there a scenario that we should not catch the exception, in which we would rather let the process die, and then restart ? The way the code currently works, if no new records are polled anything (i.e. an exception occurred)  `waitingToEmit` will always be false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93837486
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.kafka.spout.internal.Timer;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReaders;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> create(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                          KafkaConsumer<K, V> consumer,
    +                                                          TopologyContext context,
    +                                                          ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.isManualPartitionAssignment()) {
    +            int thisTaskIndex = context.getThisTaskIndex();
    +            int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
    +            KafkaPartitionReader partitionReader = KafkaPartitionReaders.create(
    +                kafkaSpoutConfig.getKafkaSpoutStreams());
    +            Timer partitionRefreshTimer = new Timer(500,
    +                kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
    +
    +            ManualKafkaRecordsFetcher.PartitionAssignmentChangeListener partitionAssignmentChangeListener = null;
    +            if (rebalanceListener != null) {
    --- End diff --
    
    I can't figure out a case for a null rebalance listener now, so this maybe an overdesign here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164674
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -120,6 +124,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
                 commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
             }
     
    +        // Manual partition assignment
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93555336
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,41 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaPartitionReaders;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    + */
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> kafkaRecordsFetcherOf(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                                         KafkaConsumer<K, V> consumer,
    +                                                                         TopologyContext context,
    +                                                                         ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.getManualPartitionAssignment()) {
    --- End diff --
    
    Agree. If you need to reuse some code across the manual and automatic mode, put it in a base abstract class, and then provide two implementations, one for auto and another for manual.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93554219
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -61,6 +64,7 @@
     
         // Storm
         protected SpoutOutputCollector collector;
    +    private TopologyContext topologyContext;
    --- End diff --
    
    is topologyContext used anywhere? If not, there is no need to create and set this field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93553634
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,85 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    --- End diff --
    
    Please remove wildcard imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    +1 once the merge conflict is resolved. It would probably be good to add some unit tests of the new classes, but I think this can be merged without them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93558408
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java ---
    @@ -0,0 +1,27 @@
    +package org.apache.storm.kafka.spout.internal.partition;
    +
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreams;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
    +
    +import java.util.HashSet;
    +
    +public final class KafkaPartitionReaders {
    +    public static KafkaPartitionReader partitionReaderOf(KafkaSpoutStreams kafkaSpoutStreams) {
    +        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
    +            return new NamedTopicPartitionReader(new HashSet<>(
    +                KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams).getTopics()));
    +        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    +            return new WildcardTopicPartitionReader(
    +                KafkaSpoutStreamsWildcardTopics.class.cast(kafkaSpoutStreams).getTopicWildcardPattern());
    +        } else {
    +            throw new IllegalArgumentException("Unrecognized kafka spout stream: " + kafkaSpoutStreams.getClass());
    +        }
    +    }
    +
    +    public static TopicPartition convert(PartitionInfo partitionInfo) {
    --- End diff --
    
    call this toTopicPartition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @liurenjie1024 @srdo lets wait till all the pending comments are resolved.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93556016
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -307,6 +333,14 @@ public KafkaSpoutRetryService getRetryService() {
             return retryService;
         }
     
    +    public long getPartitionRefreshPeriodMs() {
    +        return partitionRefreshPeriodMs;
    +    }
    +
    +    public boolean getManualPartitionAssignment() {
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164626
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaPartitionReaders.java ---
    @@ -0,0 +1,27 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.HashSet;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @liurenjie1024 You have access. You need to open your branch in the Git client you usually use and pull the latest commits from master on this repo (https://github.com/apache/storm.git). If you like you can use `git pull --rebase` to avoid a merge commit in the history. Once your branch is up to date, push it back to your fork. The PR will update automatically.
    
    Here's an example of how it can be done from command line (I have a remote called "upstream" that points to this repository, if you don't have one like it use `git remote add upstream https://github.com/apache/storm.git` to add it).
    ```
    git checkout STORM-2236
    git pull upstream master --rebase //you'll get a merge conflict
    git mergetool //resolve merge conflict (you can also use whatever git GUI you usually use)
    git add <the files you resolved conflicts for>
    git rebase --continue
    //do the above 3 lines until the rebase is over
    //At this point you should verify that the branch diff to master looks reasonable. If it looks fine, push to your branch with the following.
    git push --force
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93164866
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,88 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    +
    +public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V> {
    +    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator();
    +
    +    private final KafkaConsumer<K, V> consumer;
    +    private final int thisTaskIndex;
    +    private final int totalTaskCount;
    +    private final KafkaPartitionReader partitionReader;
    +    private final KafkaSpout.Timer partitionRefreshTimer;
    +    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
    +    private Set<TopicPartition> myPartitions = Collections.emptySet();
    +
    +    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
    +                                     int thisTaskIndex,
    +                                     int totalTaskCount,
    +                                     KafkaPartitionReader partitionReader,
    +                                     KafkaSpout.Timer partitionRefreshTimer,
    +                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener) {
    +        this.consumer = consumer;
    +        this.thisTaskIndex = thisTaskIndex;
    +        this.totalTaskCount = totalTaskCount;
    +        this.partitionReader = partitionReader;
    +        this.partitionRefreshTimer = partitionRefreshTimer;
    +        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void refreshMyPartitionsIfNeed() {
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93559465
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,85 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    +
    +public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V> {
    +    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator();
    +
    +    private final KafkaConsumer<K, V> consumer;
    +    private final int thisTaskIndex;
    +    private final int totalTaskCount;
    +    private final KafkaPartitionReader partitionReader;
    +    private final KafkaSpout.Timer partitionRefreshTimer;
    +    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
    +    private Set<TopicPartition> myPartitions = Collections.emptySet();
    +
    +    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
    +                                     int thisTaskIndex,
    +                                     int totalTaskCount,
    +                                     KafkaPartitionReader partitionReader,
    +                                     KafkaSpout.Timer partitionRefreshTimer,
    +                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener) {
    +        this.consumer = consumer;
    +        this.thisTaskIndex = thisTaskIndex;
    +        this.totalTaskCount = totalTaskCount;
    +        this.partitionReader = partitionReader;
    +        this.partitionRefreshTimer = partitionRefreshTimer;
    +        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void refreshMyPartitionsIfNeeded() {
    +        if (!partitionRefreshTimer.isExpiredResetOnTrue()) {
    +            return;
    +        }
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void doRefreshMyPartitions() {
    +        List<TopicPartition> topicPartitions = partitionReader.readPartitions(consumer);
    +        Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR);
    --- End diff --
    
    This sort ensures that all instances of KafkaSpout see the same ordered list so that the calculation is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @harshach OK I'll submit a PR for that later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1835


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @hmcl You might want to take a look at this too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93314715
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java ---
    @@ -0,0 +1,52 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreams;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.regex.Pattern;
    +
    +/**
    --- End diff --
    
    please remove author names and add a apache license on top of the file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @liurenjie1024 once the review is complete and it incorporates the feedback from all the reviewers (I am going through the review now), please squash the commits into one commit, and make sure that this commit has the title of the JIRA it is addressing, as well as a summary of its contents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93661119
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java ---
    @@ -0,0 +1,60 @@
    +package org.apache.storm.kafka.spout.internal;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 22/12/2016.
    --- End diff --
    
    You should really just edit the template in your IDE :)
    
    I'm making the same change here https://github.com/apache/storm/pull/1832/files#diff-73306e32282def8d045e0bcd2d33f59b btw.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93484518
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,41 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaPartitionReaders;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    + */
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> kafkaRecordsFetcherOf(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                                         KafkaConsumer<K, V> consumer,
    +                                                                         TopologyContext context,
    +                                                                         ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.getManualPartitionAssignment()) {
    --- End diff --
    
    @liurenjie1024 internal details or not we will keep adding one implementation or another so it's better to make this cleaner when we have an opportunity to do so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93105747
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -120,6 +124,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
                 commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
             }
     
    +        // Manual partition assignment
    --- End diff --
    
    This doesn't seem like it belongs here anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    Any update about the merge process of this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93107722
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,88 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    +
    +public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V> {
    +    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator();
    +
    +    private final KafkaConsumer<K, V> consumer;
    +    private final int thisTaskIndex;
    +    private final int totalTaskCount;
    +    private final KafkaPartitionReader partitionReader;
    +    private final KafkaSpout.Timer partitionRefreshTimer;
    +    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
    +    private Set<TopicPartition> myPartitions = Collections.emptySet();
    +
    +    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
    +                                     int thisTaskIndex,
    +                                     int totalTaskCount,
    +                                     KafkaPartitionReader partitionReader,
    +                                     KafkaSpout.Timer partitionRefreshTimer,
    +                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener) {
    +        this.consumer = consumer;
    +        this.thisTaskIndex = thisTaskIndex;
    +        this.totalTaskCount = totalTaskCount;
    +        this.partitionReader = partitionReader;
    +        this.partitionRefreshTimer = partitionRefreshTimer;
    +        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void refreshMyPartitionsIfNeed() {
    --- End diff --
    
    Nit: Should be IfNeeded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    +1. Just one minor nit on a name. can you please squash the commits and put a little summary of the patch in the commit message. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    LGTM. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    Hi, all:
    All commits have been squashed into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93105778
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -75,6 +78,7 @@
         private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
         private transient Timer commitTimer;                                // timer == null for auto commit mode
         private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
    +    private transient KafkaRecordsFetcher<K, V> recordsFetcher;         // Class that encapsulate the log of partition management
    --- End diff --
    
    I don't understand the comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93723547
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java ---
    @@ -0,0 +1,60 @@
    +package org.apache.storm.kafka.spout.internal;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 22/12/2016.
    --- End diff --
    
    Sorry for that. I've fixed it. I'll merge your change later after you submitted it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r94527249
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java ---
    @@ -0,0 +1,27 @@
    +package org.apache.storm.kafka.spout.internal.partition;
    +
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreams;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
    +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
    +
    +import java.util.HashSet;
    +
    +public final class KafkaPartitionReaders {
    +    public static KafkaPartitionReader partitionReaderOf(KafkaSpoutStreams kafkaSpoutStreams) {
    +        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
    +            return new NamedTopicPartitionReader(new HashSet<>(
    +                KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams).getTopics()));
    +        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
    +            return new WildcardTopicPartitionReader(
    +                KafkaSpoutStreamsWildcardTopics.class.cast(kafkaSpoutStreams).getTopicWildcardPattern());
    +        } else {
    +            throw new IllegalArgumentException("Unrecognized kafka spout stream: " + kafkaSpoutStreams.getClass());
    +        }
    +    }
    +
    +    public static TopicPartition convert(PartitionInfo partitionInfo) {
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @srdo Do I have the access to resolve the conflicts? Or it should be resolved by those who merge the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93180216
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -218,7 +224,11 @@ public void nextTuple() {
                 }
     
                 if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    +                try {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                } catch (Exception e) {
    +                    LOG.error("Failed to poll from kafka.", e);
    --- End diff --
    
    As kafka client's doc says, I've changed the exception to KafkaException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @liurenjie1024 thanks for you patience. Merged into master. I would like to merge this into 1.x-branch as well. You are using FunctionalInterface and 1.x branch we are still on java 7 . Do you want to change that open another PR for 1.x-branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93559928
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,85 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    Please copy the Apache license into all the new files. You can just grab it from the top of KafkaSpout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    Hi, all: 
    I've resolved the comments and when this patch can be merged into master? I've read trident source code and it seems that there exists some bugs there, and the bug fix may rely on codes in this branch, so I hope it can be merge ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93781643
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.kafka.spout.internal.Timer;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReaders;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> create(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                          KafkaConsumer<K, V> consumer,
    +                                                          TopologyContext context,
    +                                                          ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.isManualPartitionAssignment()) {
    +            int thisTaskIndex = context.getThisTaskIndex();
    +            int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
    +            KafkaPartitionReader partitionReader = KafkaPartitionReaders.create(
    +                kafkaSpoutConfig.getKafkaSpoutStreams());
    +            Timer partitionRefreshTimer = new Timer(500,
    +                kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
    +
    +            ManualKafkaRecordsFetcher.PartitionAssignmentChangeListener partitionAssignmentChangeListener = null;
    +            if (rebalanceListener != null) {
    --- End diff --
    
    Won't this cause an NPE in ManualKafkaRecordsFetcher if the null case is hit here? Why does it make sense to call create without a rebalance listener?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @harshach @srdo @hmcl Any updates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93108849
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaPartitionReader.java ---
    @@ -0,0 +1,10 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.List;
    +
    +public interface KafkaPartitionReader {
    --- End diff --
    
    Nit: Some of these classes could probably go in the internal package, or a new subpackage of the internal package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93105952
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -218,7 +224,11 @@ public void nextTuple() {
                 }
     
                 if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    +                try {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                } catch (Exception e) {
    +                    LOG.error("Failed to poll from kafka.", e);
    --- End diff --
    
    Could you share what exception you're seeing here? We haven't seen the spout be unstable due to exceptions from here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93558112
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java ---
    @@ -0,0 +1,85 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.TopicPartitionComparator;
    +
    +import java.util.*;
    +
    +public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V> {
    +    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator();
    +
    +    private final KafkaConsumer<K, V> consumer;
    +    private final int thisTaskIndex;
    +    private final int totalTaskCount;
    +    private final KafkaPartitionReader partitionReader;
    +    private final KafkaSpout.Timer partitionRefreshTimer;
    +    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
    +    private Set<TopicPartition> myPartitions = Collections.emptySet();
    +
    +    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
    +                                     int thisTaskIndex,
    +                                     int totalTaskCount,
    +                                     KafkaPartitionReader partitionReader,
    +                                     KafkaSpout.Timer partitionRefreshTimer,
    +                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener) {
    +        this.consumer = consumer;
    +        this.thisTaskIndex = thisTaskIndex;
    +        this.totalTaskCount = totalTaskCount;
    +        this.partitionReader = partitionReader;
    +        this.partitionRefreshTimer = partitionRefreshTimer;
    +        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void refreshMyPartitionsIfNeeded() {
    +        if (!partitionRefreshTimer.isExpiredResetOnTrue()) {
    +            return;
    +        }
    +
    +        doRefreshMyPartitions();
    +    }
    +
    +    private void doRefreshMyPartitions() {
    +        List<TopicPartition> topicPartitions = partitionReader.readPartitions(consumer);
    +        Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR);
    --- End diff --
    
    Why is this sort necessary ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93364905
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,41 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaPartitionReaders;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    + */
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> kafkaRecordsFetcherOf(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                                         KafkaConsumer<K, V> consumer,
    +                                                                         TopologyContext context,
    +                                                                         ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.getManualPartitionAssignment()) {
    --- End diff --
    
    I think just a flag is easier to use since the user has to write code to configure it otherwise, especially in the case of using flux to configure storm topology. Though your way is more flexible but I don't think we need the flexibility here since it's an internal implementation and the user do not need to know much about that. Let the user assign the implementation class should be used in cases like tuple builder. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93294533
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -218,7 +223,11 @@ public void nextTuple() {
                 }
     
                 if (poll()) {
    -                setWaitingToEmit(pollKafkaBroker());
    +                try {
    +                    setWaitingToEmit(pollKafkaBroker());
    +                } catch (KafkaException e) {
    --- End diff --
    
    I think you should catch https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/RetriableException.html instead, it seems to cover all the exceptions that are expected to be transient (leader change for example, through InvalidMetadataException). Catching KafkaException will break interrupt handling (see https://github.com/apache/storm/pull/1821) because InterruptException is an instance of KafkaException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93501465
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,41 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaPartitionReaders;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    + */
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> kafkaRecordsFetcherOf(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                                         KafkaConsumer<K, V> consumer,
    +                                                                         TopologyContext context,
    +                                                                         ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.getManualPartitionAssignment()) {
    --- End diff --
    
    @liurenjie1024 irrespective of if it's an internal detail or not we should go with the right approach. We have interfaces for a reason. If we keep adding implementations on how the records are being fetched we need to keep adding these if conditions. Given we are already making changes lets go with the right approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93104526
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaPartitionReaders.java ---
    @@ -0,0 +1,27 @@
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import java.util.HashSet;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    --- End diff --
    
    Please remove these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1835: STORM-2236: Kafka Spout with manual partition management.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1835
  
    @liurenjie1024 thanks for the updates. Overall looks good to me. Can you squash your commits into one commit. +1 after that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1835: STORM-2236: Kafka Spout with manual partition mana...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1835#discussion_r93555404
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java ---
    @@ -0,0 +1,41 @@
    +package org.apache.storm.kafka.spout.internal.fetcher;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.KafkaPartitionReader;
    +import org.apache.storm.kafka.spout.KafkaPartitionReaders;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    +import org.apache.storm.task.TopologyContext;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Created by liurenjie on 19/12/2016.
    + */
    +public final class KafkaRecordsFetchers {
    +    public static <K, V> KafkaRecordsFetcher<K, V> kafkaRecordsFetcherOf(KafkaSpoutConfig kafkaSpoutConfig,
    +                                                                         KafkaConsumer<K, V> consumer,
    +                                                                         TopologyContext context,
    +                                                                         ConsumerRebalanceListener rebalanceListener) {
    +        if (kafkaSpoutConfig.getManualPartitionAssignment()) {
    --- End diff --
    
    1. The if conditions are wrapped in a factory method so adding more cases will not pollute other components.
    2. The construction of instances can only happen in the open method since it needs topology context.
    3. Interface is used to hide the client from implementation details and I don't think our approach here violated the rule.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---