You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by reiabreu <gi...@git.apache.org> on 2018/03/17 11:12:33 UTC

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

GitHub user reiabreu opened a pull request:

    https://github.com/apache/storm/pull/2593

    STORM-2994 KafkaSpout commit offsets for null tuples

    Hello,
    
    Let's kick off this pull request.
    Unit tests for null tuples were missing. I'm in the process of adding them.
    I'll update this PR asap. Meanwhile, these seem to be the necessary changes

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/reiabreu/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2593
    
----
commit 1db31bcf784a69d3904f39d86d8987f3c9e5e4bd
Author: Rui Abreu <ru...@...>
Date:   2018-03-16T12:50:11Z

    null tuples are now marked as emitted and marked as acked

commit 59235e79d64b8888b0190bf1b9364066215f3650
Author: Rui Abreu <ru...@...>
Date:   2018-03-17T11:09:08Z

    Undoing import statements changes

----


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178551488
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -570,20 +572,25 @@ public void ack(Object messageId) {
             // Only need to keep track of acked tuples if commits to Kafka are controlled by
             // tuple acks, which happens only for at-least-once processing semantics
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    +        if (!msgId.isNullTuple()) {
    --- End diff --
    
    Nit: You can reduce the nesting by a bit here by switching this to a guard clause, e.g.
    
    ```
    if (is null tuple) {
      ...
      return
    }
    if (emitted contains) {
      ...
    } else {
      ...
    }
    ```
    
    Up to you whether you want to change it


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178571304
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.utils.Time;
    +import org.junit.Test;
    +
    +import java.util.regex.Pattern;
    +
    +import static org.mockito.ArgumentMatchers.*;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.verify;
    +
    +public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
    +    private final int maxPollRecords = 10;
    +    private final int maxRetries = 3;
    +
    +    public KafkaSpoutNullTupleTest() {
    +        super(2_000);
    +    }
    +
    +
    +    @Override
    +    KafkaSpoutConfig<String, String> createSpoutConfig() {
    +        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfigNullTuples(
    +            KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
    +                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
    +            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +            .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    --- End diff --
    
    Nit: Unless it's important to the test, I'd leave this config out or use the SingleTopicKafkaSpoutConfiguration utility to set common configuration. Same for maxPollRecords.


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178510737
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -484,8 +484,11 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
                         return true;
                     }
                 } else {
    +                /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
    +                * to allow its offset to be commited to Kafka*/
                     LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                     msgId.setEmitted(false);
    --- End diff --
    
    I concur. I'll refactor it.


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout commit offsets for null tuples

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    @reiabreu kindly reminder.


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout doesn't commit null tuples offsets

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    @srdo Sure, I'll have a look


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178562679
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -570,20 +572,25 @@ public void ack(Object messageId) {
             // Only need to keep track of acked tuples if commits to Kafka are controlled by
             // tuple acks, which happens only for at-least-once processing semantics
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    +        if (!msgId.isNullTuple()) {
    --- End diff --
    
    Great, that sounds good.


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178559146
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -570,20 +572,25 @@ public void ack(Object messageId) {
             // Only need to keep track of acked tuples if commits to Kafka are controlled by
             // tuple acks, which happens only for at-least-once processing semantics
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    +        if (!msgId.isNullTuple()) {
    --- End diff --
    
    Sounds good, will update it.
    Regarding tests, we need to add at least one: 
    
    Test all messages are commited for all null tuples when Spout is not set to emit null tuples
    
    ```
    @Test
        public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() throws Exception {
            final int messageCount = 10;
            prepareSpout(messageCount);
    
            //All null tuples should be commited, meaning they were considered by to be emitted and acked
            for(int i = 0; i < messageCount; i++) {
                spout.nextTuple();
            }
    
            verify(collectorMock,never()).emit(
                    anyString(),
                    anyList(),
                    any());
    
            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
            //Commit offsets
            spout.nextTuple();
    
            verifyAllMessagesCommitted(messageCount);
        }
    ```
    We would need a  new SingleTopicKafkaSpoutConfiguration with something like:
    
    ```
     private static class NullRecordExtractor implements RecordTranslator{
            
            @Override
            public List<Object> apply(ConsumerRecord record) {
                return null;
    
            }
    
            @Override
            public Fields getFieldsFor(String stream) {
                return new Fields("topic", "key", "value");
            }
    
            @Override
            public Object apply(Object record) {
                return null;
            }
        }
    ```
    
    I was planning to extend KafkaSpoutAbstractTest on something similar to KafkaSpoutSingleTopicTest.



---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout doesn't commit null tuples offsets

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    Thank you for guiding me through the changes. I've squashed all the commits and pushed a fresh one


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout doesn't commit null tuples offsets

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    +1, thanks for your patience. Please squash to one commit, and we can merge.


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r175256064
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -484,8 +484,11 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
                         return true;
                     }
                 } else {
    +                /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
    +                * to allow its offset to be commited to Kafka*/
                     LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                     msgId.setEmitted(false);
    --- End diff --
    
    I'd like to see this property renamed to "isFiltered" or "isNullTuple" something similar. Right now we have both a list of emitted tuples, and also the isEmitted property, and they don't always match. It seems unnecessarily confusing.


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178571949
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.utils.Time;
    +import org.junit.Test;
    +
    +import java.util.regex.Pattern;
    +
    +import static org.mockito.ArgumentMatchers.*;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.verify;
    +
    +public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
    +    private final int maxPollRecords = 10;
    +    private final int maxRetries = 3;
    +
    +    public KafkaSpoutNullTupleTest() {
    +        super(2_000);
    +    }
    +
    +
    +    @Override
    +    KafkaSpoutConfig<String, String> createSpoutConfig() {
    +        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfigNullTuples(
    --- End diff --
    
    I think you can simplify this by using SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder, and simply setting the record translator. I don't think there's a reason to put a whole new configuration in SingleTopicKafkaSpoutConfiguration. I'd also rather keep the record translator local to this class, since we're not using it in other tests.


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r175256133
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -576,6 +579,8 @@ public void ack(Object messageId) {
                         + "came from a topic-partition that this consumer group instance is no longer tracking "
                         + "due to rebalance/partition reassignment. No action taken.", msgId);
                 } else {
    +                //a null tuple should be added to the ack list since by definition is a direct ack
    --- End diff --
    
    We should probably switch the order of the emitted contains and msgId isEmitted checks. It is not possible that isEmitted is false while the tuple is in the emitted list.


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout commit offsets for null tuples

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    KafkaSpoutAbstractTest is tightly coupled to  KafkaSpoutConfig through createSpoutConfig, meaning that to test a single configuration change, we need to create a new test class. This is something that we can redesign in the future.


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout commit offsets for null tuples

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    Hi folks! I won't have access to a computer until next weekend. I'll pick
    it up then. Thank you for understanding
    
    On Wed, 28 Mar 2018, 08:15 Jungtaek Lim, <no...@github.com> wrote:
    
    > @reiabreu <https://github.com/reiabreu> kindly reminder.
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/storm/pull/2593#issuecomment-376850473>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AA3TIrYbwigR8oxWBJPugLQeTdTNB4Zmks5ti3DAgaJpZM4Suw4Q>
    > .
    >



---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout doesn't commit null tuples o...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2593


---

[GitHub] storm issue #2593: STORM-2994 KafkaSpout doesn't commit null tuples offsets

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2593
  
    @reiabreu Merged to master. Looks like there are some small modifications to do to get it on to 1.x or earlier branches. Could you put up a PR against 1.x as well (should be pretty easy with `git cherry-pick`)?


---

[GitHub] storm pull request #2593: STORM-2994 KafkaSpout commit offsets for null tupl...

Posted by reiabreu <gi...@git.apache.org>.
Github user reiabreu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178586564
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.utils.Time;
    +import org.junit.Test;
    +
    +import java.util.regex.Pattern;
    +
    +import static org.mockito.ArgumentMatchers.*;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.verify;
    +
    +public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
    +    private final int maxPollRecords = 10;
    +    private final int maxRetries = 3;
    +
    +    public KafkaSpoutNullTupleTest() {
    +        super(2_000);
    +    }
    +
    +
    +    @Override
    +    KafkaSpoutConfig<String, String> createSpoutConfig() {
    +        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfigNullTuples(
    --- End diff --
    
    Absolutely, will simplify it


---