You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2016/09/20 14:55:36 UTC

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/1696

    STORM-2104: More graceful handling of acked/failed tuples after parti\u2026

    \u2026tion reassignment in new Kafka spout
    
    See https://issues.apache.org/jira/browse/STORM-2104
    
    In order to test this change I added a factory for KafkaConsumers. Please let me know if there's a nicer way to mock it.
    
    In addition to fixing the described issue, I changed a few types on KafkaSpoutConfig. If the user specifies a non-serializable Deserializer in either setter in KafkaSpoutConfig.Builder, the topology can't start because Nimbus can't serialize KafkaSpoutConfig.
    
    I borrowed a few classes from https://github.com/apache/storm/pull/1679. I hope that's okay with you @jfenc91. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2104

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1696
    
----
commit d319b0a68ac6537d6e9561da8e90b693c2b9f990
Author: Stig Rohde D�ssing <st...@gmail.com>
Date:   2016-09-20T11:53:55Z

    STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1696#discussion_r80026248
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java ---
    @@ -0,0 +1,25 @@
    +/*
    + * Copyright 2016 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +/**
    + * @param <T> The type this deserializer deserializes to.
    + */
    +public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
    --- End diff --
    
    I thought it was nice to have, since setKey/ValueDeserializer in the builder implicitly requires the deserializer to be serializable. For example, if you try to set the standard Kafka StringDeserializer via those methods, you'll get a NotSerializableException when the topology is submitted to Storm, since they'll be set as fields on the final KafkaSpoutConfig field in KafkaSpout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @hmcl ping. Had a chance to look at this? It would be nice to get merged soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    The test failure is due to maven failing to download dependencies on storm-core. !storm-core passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1696#discussion_r91569248
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java ---
    @@ -0,0 +1,25 @@
    +/*
    + * Copyright 2016 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +/**
    + * @param <T> The type this deserializer deserializes to.
    + */
    +public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
    --- End diff --
    
    Good point.  I think I am going to need to fix that on my patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @hmcl Sure thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @revans2 ping for review if you have time. I'd like to get this in before too long if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1696


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @srdo @jfenc91 I am on vacation this week (with limited access to Internet) and I will be back on Monday. Can we please holding on merging this until I can finish my review. I implemented the original patch, and would like to review these changes. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by jfenc91 <gi...@git.apache.org>.
Github user jfenc91 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1696#discussion_r79973663
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -145,6 +154,10 @@ private void initialize(Collection<TopicPartition> partitions) {
                 }
     
                 retryService.retainAll(partitions);
    +            
    +            //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
    +            Set<TopicPartition> partitionsSet = new HashSet(partitions);
    +            emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
    --- End diff --
    
    This looks good. I think this same logic may be needed in onPartitionsRevoked as well. Also, I believe the message may need to be removed from the retryService as well. Please correct me if I am wrong! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by qiozas <gi...@git.apache.org>.
Github user qiozas commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @srdo thank you very much for your offer.
    
    We have performed many tests with 1.0.2/3 release in order to be sure for any problems in migration from 0.9.6 to new version. Our major problem is the current one.
    
    Additionally, Storm 1.1.0 will be the first release with Kafka 0.10 API, so I am not very confident to use it in a production system. We prefer to use Kafka 0.10 API, but it is too new in Storm world (actually Storm hold us back to Kafka 0.9 API, but we can live for some time).
    If it is not big trouble, it will be very helpful if you merge this fix to 1.0.x branch (afterwards we can use it in our code too).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    This looks good to me.  Now that I have gone through the kafka spout code for my other pull request I am confident in giving this a +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @qiozas The Kafka 0.10 API changes were more or less a one-liner for the spout if I recall, so it shouldn't be a big risk to update. 0.9 and 0.10 have the same API.
    
    I'll take a look at backporting this soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by qiozas <gi...@git.apache.org>.
Github user qiozas commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    Do you plan to port this to 1.0.x branch too?
    We are facing the same issue on 1.0.3 release (upgrade from 0.9.6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @qiozas If you really need it on 1.0.x, then I wouldn't mind porting it. It seems like 1.1.0 is right around the corner though (RCs are being tested), so it might be faster for you to upgrade when that comes out, since a backport would have to wait for another 1.0.x release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1696
  
    @hmcl @revans2 @knusbaum Could you guys take a look at this at some point? I noticed that two new PRs (https://github.com/apache/storm/pull/1752, https://github.com/apache/storm/pull/1753) were opened to solve the same problem, and this one has been kind of radio silent for a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1696#discussion_r79800937
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java ---
    @@ -0,0 +1,25 @@
    +/*
    + * Copyright 2016 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import java.io.Serializable;
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +/**
    + * @param <T> The type this deserializer deserializes to.
    + */
    +public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
    --- End diff --
    
    Why do we need this wrapper marking interface?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1696#discussion_r79992419
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -145,6 +154,10 @@ private void initialize(Collection<TopicPartition> partitions) {
                 }
     
                 retryService.retainAll(partitions);
    +            
    +            //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
    +            Set<TopicPartition> partitionsSet = new HashSet(partitions);
    +            emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
    --- End diff --
    
    The messages should be getting removed from retryService in line 156. It's my impression that onPartitionsAssigned will be getting called immediately after onPartitionsRevoked, before the current call to poll returns (see https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---