You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by bijanfahimi <gi...@git.apache.org> on 2017/08/17 19:53:54 UTC

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

GitHub user bijanfahimi opened a pull request:

    https://github.com/apache/storm/pull/2282

    STORM-2694: Add KafkaTupleListener to storm-kafka-client

    This PR proposes an improvement to the KafkaSpout. A KafkaTupleListener interface can now be implemented to handle state changes of the tuple. 
    Our primary use case is to put tuples, which are discarded by the RetryService into a dead letter queue.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bijanfahimi/storm STORM-KafkaRetry

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2282
    
----
commit cb46873c95d252db9a2c6d92fec7a1abdd5e67b6
Author: Bijan Fahimi <bi...@outlook>
Date:   2017-08-17T19:24:00Z

    Add KafkaTupleListener to storm-kafka-client

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r133973452
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,35 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) {
    +        // empty method
    --- End diff --
    
    This comment isn't really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    Thanks for making the changes. +1 once the license issue is fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134280097
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onRetry(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public String toString() {
    +        return "EmptyKafkaTupleListener";
    --- End diff --
    
    Makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    @bijanfahimi I think the last minor thing is the comment left by @hmcl here https://github.com/apache/storm/pull/2282#discussion_r135033155. I'll merge this tomorrow unless @hmcl objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134996598
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +
    +    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
    --- End diff --
    
    I have updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135032704
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    --- End diff --
    
    @bijanfahimi I don't think this class is necessary. We can just put this default, idle, behavior in the interface itself by making the interface methods default methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125813
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onRetry(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public String toString() {
    +        return "EmptyKafkaTupleListener";
    --- End diff --
    
    Nit: What is this toString for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r133973275
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,48 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    --- End diff --
    
    It might be useful to have a prepare method on this interface too, in case implementations need to instantiate something that can't be serialized, e.g. a KafkaProducer. I'm also wondering if it would be useful to be able to emit dead tuples to a different stream as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134127239
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -363,6 +369,7 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                             } else {
                                 collector.emit(tuple, msgId);
                             }
    +                        tupleListener.onEmit(tuple, msgId);
    --- End diff --
    
    I have added additional comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135051662
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    --- End diff --
    
    It's now final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134880439
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +
    +    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
    --- End diff --
    
    I'm wondering if we should pass along the output collector here? If we don't support retrying tuples emitted to an error stream, then I don't know how useful emitting to such a stream will be. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125786
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    --- End diff --
    
    I think this should take the list of assigned partitions as a parameter, and be called in the onPartitionsAssigned of the rebalance listener instead. That would allow implementations to delete any saved message ids that are for revoked partitions easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134867571
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,35 @@
    +package org.apache.storm.kafka.spout;
    --- End diff --
    
    I have added the license.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135539518
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onRetry(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public String toString() {
    +        return "EmptyKafkaTupleListener";
    --- End diff --
    
    @hmcl At the risk of getting into bikeshedding territory, I think it makes sense to leave it this way, since the listener is being printed in the toString of KafkaSpoutConfig. As it is now, the print will show "tupleListener=EmptyKafkaTupleListener", so at least it is obvious what's in the field. Leaving it empty would show "tupleListener=".
    
    I'm happy to merge this either way though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125532
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,48 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +    /**
    +     * Called when the tuple is emitted.
    +     *
    +     * @param tuple the storm tuple.
    +     * @param msgId The id of the tuple in the spout.
    +     */
    +    void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId);
    +
    +
    +    /**
    +     * Called when a tuple is acked.
    +     *
    +     * @param msgId The id of the tuple in the spout.
    +     */
    +    void onAck(KafkaSpoutMessageId msgId);
    +
    +    /**
    +     * Called when a fail reaches the spout, but the Kafka record does not belong to the spout anymore.
    --- End diff --
    
    I changed the name of the method and updated the comment to reflect your remarks. You were right about the onPartitionsReassigned. That was what I intended with the call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r133971254
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,48 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +    /**
    +     * Called when the tuple is emitted.
    +     *
    +     * @param tuple the storm tuple.
    +     * @param msgId The id of the tuple in the spout.
    +     */
    +    void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId);
    +
    +
    +    /**
    +     * Called when a tuple is acked.
    +     *
    +     * @param msgId The id of the tuple in the spout.
    +     */
    +    void onAck(KafkaSpoutMessageId msgId);
    +
    +    /**
    +     * Called when a fail reaches the spout, but the Kafka record does not belong to the spout anymore.
    --- End diff --
    
    Nit: The kafka record belongs to a partition that is not assigned to the spout anymore.
    I'm wondering what this hook would be useful for? The record will get replayed by a different task if using at-least-once, otherwise it'll be dropped. I think if the reason it is here is to allow implementations to clean up after emit, we should just have a general onPartitionsReassigned(assignedPartitions) instead. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125454
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,48 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    --- End diff --
    
    Good idea, I have added a prepare method or "open" since it is in the context of a spout.  
    
    I am not sure what you mean by "dead tuples". 
    If `isEmitTuple(tuple)` is false in line 380 of the KafkaSpouts `emitTupleIfNotEmitted`? But then i would have to change the onAck call as well, since it would be called twice. I can imagine that someone needs more callbacks. However, I tried to keep the interface simple and small. But if you wish I would gladly add new functionality to the interface.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134980276
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +
    +    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
    --- End diff --
    
    I'm leaning towards removing it too. We can always add in error stream support later if someone needs it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134280004
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,35 @@
    +package org.apache.storm.kafka.spout;
    --- End diff --
    
    You need to add the Apache license to the top of the new files. You can just copy it from one of the other ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    I have updated the PR according to the comments. The checkstyle errors should be fixed. If there is anything else, please let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2282


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134127249
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135033155
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onRetry(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public String toString() {
    +        return "EmptyKafkaTupleListener";
    --- End diff --
    
    If there is no config fields to show, in my opinion this method should simply return an empty String. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135038754
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    --- End diff --
    
    I think it is nicer to leave the empty implementation as a separate thing from the interface, so other implementations have to implement all methods. Maybe make this one final too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125885
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -363,6 +369,7 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                             } else {
                                 collector.emit(tuple, msgId);
                             }
    +                        tupleListener.onEmit(tuple, msgId);
    --- End diff --
    
    Is it intended that this isn't called in auto commit mode? I would understand why we wouldn't want to call it there, but I think we should maybe add a note to the interface so it's obvious to users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134977497
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    +
    +
    +    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
    --- End diff --
    
    Thats a good point. So how I see it there are two options. First, add a hook for declareOutputFields to allow streaming to an error stream or, as you suggested, removing the SpoutOutputcollector from the open parameters. 
    
    I tend to your suggestion. I think when this interface is capable of emitting it acts like a spout Itself and not like a simple listener.
    
    If more people need to emit to an user specific error stream the interface can be expanded afterwards, i guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r135037987
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    --- End diff --
    
    @hmcl default methods in interfaces is a java 8 feature. If storm is > java 7 then indeed this class is not needed. I haven't checked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    Thanks @bijanfahimi, I merged to master. Cherry-picking to 1.x-branch hit some conflicts, so please open a PR against that branch as well if you'd like this to go in those releases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134126020
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +
    +public class EmptyKafkaTupleListener implements KafkaTupleListener {
    +
    +    @Override
    +    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { }
    +
    +    @Override
    +    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onAck(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onPartitionsReassigned(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onRetry(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
    +
    +    @Override
    +    public String toString() {
    +        return "EmptyKafkaTupleListener";
    --- End diff --
    
    The KafkaSpoutConfig has implemented the toString method, which print the current config. I have added that for consistency reasons. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r133974197
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -73,7 +73,9 @@
         // Strategy to determine the fetch offset of the first realized by the spout upon activation
         private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
         // Class that has the logic to handle tuple failure
    -    private transient KafkaSpoutRetryService retryService;              
    +    private transient KafkaSpoutRetryService retryService;
    +    // Handles the events off a tuple
    --- End diff --
    
    Nit: Rephrase as "Handles tuple events (emit, ack etc.)", I don't think it is clear what this means.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by bijanfahimi <gi...@git.apache.org>.
Github user bijanfahimi commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    @hmcl and @srdo.  Is there anything else I can do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-client

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2282
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2282#discussion_r134125713
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---
    @@ -0,0 +1,48 @@
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
    + */
    +public interface KafkaTupleListener extends Serializable {
    --- End diff --
    
    I meant being able to emit the tuples that have exceeded their retry limit (those that hit onMaxRetryReached) to a stream. It's probably not a great idea, since we'd need to consider processing guarantees for those too, so never mind :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---