You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by askprasanna <gi...@git.apache.org> on 2017/06/07 12:38:54 UTC

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for topi...

GitHub user askprasanna opened a pull request:

    https://github.com/apache/storm/pull/2152

    [STORM-2505] OffsetManager should account for topic voids when returning the count of committed offsets

    The logic currently does a simple subtraction between the first and last offset in the committed range. If the commit spans topic voids then this would be incorrect. This incorrect number would always be on the higher side and impact the logic around maximum uncommitted offsets allowable in the spout.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/askprasanna/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2152
    
----
commit 5ae01efe4d2bf34e84bc342b1caf09ae8ce8e516
Author: Prasanna Ranganathan <pr...@flipkart.com>
Date:   2017-06-07T12:33:09Z

    [STORM-2505] OffsetManager should account for topic voids when returning the count of committed offsets

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    @askprasanna let's just create a new JIRA and associate it with the STORM-2505 JIRA - it's a quick thing and will take care of this clean. 
    
    Another minor detail we have a script to find orphan pull requests, malformed pull requests, etc. The pull request description `[STORM-2505] OffsetManager...` does not conform with the name convention. Can you please rename it to follow the pattern `STORM-2505: OffsetManager...`. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r127538620
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.Captor;
    +import org.mockito.InOrder;
    +import org.mockito.MockitoAnnotations;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    @Captor
    +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        MockitoAnnotations.initMocks(this);
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    +            for (int i = 0; i < 5; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            for (int i = 8; i < 10; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            records.put(partition, recordsForPartition);
    +
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(records));
    +
    +            for (int i = 0; i < recordsForPartition.size(); i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
    +
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +
    +            // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
    +            spout.nextTuple();
    +
    +            InOrder inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    +
    +            //verify that Offset 4 was last committed offset
    +            //the offset void should be bridged in the next commit
    +            Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
    +            assertTrue(commits.containsKey(partition));
    +            assertEquals(4, commits.get(partition).offset());
    +
    +            //Trigger second kafka consumer commit
    +            reset(consumerMock);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            spout.nextTuple();
    +
    +            inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    --- End diff --
    
    Same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    I picked the existing JIRA since it is a followup to the same issue and addresses something that was missed in the earlier commits. Can we not reopen and re-resolved?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Missed adding the test. Will do so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Will be happy to do that. Haven't checked fully and being a bit lazy here - we are using lambdas only in the unit tests, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: STORM-2639: Kafka Spout incorrectly computes numCommitted...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Created a new JIRA and updated PR title as recommended.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Have made the necessary changes. LMK if anything else is needed for the PR to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Squashed the commits. Also opened https://github.com/apache/storm/pull/2217 for 1.x branch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r127538410
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.Captor;
    +import org.mockito.InOrder;
    +import org.mockito.MockitoAnnotations;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    @Captor
    +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        MockitoAnnotations.initMocks(this);
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    +            for (int i = 0; i < 5; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            for (int i = 8; i < 10; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            records.put(partition, recordsForPartition);
    +
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(records));
    +
    +            for (int i = 0; i < recordsForPartition.size(); i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
    +
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +
    +            // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
    +            spout.nextTuple();
    +
    +            InOrder inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    --- End diff --
    
    Nit: I don't think it matters to the test that poll is called. Consider removing this and the InOrder if you agree


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for topic voids...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    I am referring to valid gaps in the offsets seen by the kafka consumer under certain conditions.
    
    I am using the same terminology as when we created PR https://github.com/apache/storm/pull/2104 This is a follow-up to that PR. Kindly refer to https://issues.apache.org/jira/browse/STORM-2505 for details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r121249114
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -104,6 +104,10 @@ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
             this.kafkaSpoutConfig = kafkaSpoutConfig;
         }
     
    +    long getNumUncommittedOffsets() {
    --- End diff --
    
    Yes, the main purpose of the test is verifying commit behavior over offset gaps. We can remove this getter and the asserts on it if required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: STORM-2639: Kafka Spout incorrectly computes numCommitted...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    +1 again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Added the unit test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Yes, it looks like there are only lambdas in the test code for these two PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    I forgot to mention it, but this seems applicable to the 1.x branch as well. We can't cherry pick this back since it uses a few lambdas, so would you be willing to make a PR for a backport? This would also be the case for https://github.com/apache/storm/pull/2153.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for topic voids...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    @askprasanna what do you mean by "topic void" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r127695589
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.Captor;
    +import org.mockito.InOrder;
    +import org.mockito.MockitoAnnotations;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    @Captor
    +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        MockitoAnnotations.initMocks(this);
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    --- End diff --
    
    will tweak the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    @askprasanna 
    Maybe we would need to associate this to a new issue to track your effort (given that assignee for STORM-2505 is not you and it can't be overwritten), or remove the issue at all. Except test code I think it's fairly small change which doesn't strictly require filing an issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    I'm not sure if we should have a new JIRA issue for this btw. The issue this links to is already marked fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    Updated to title to mention offset void. Topic void is a typo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: STORM-2639: Kafka Spout incorrectly computes numCo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2152


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r121217733
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.InOrder;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    +            for (int i = 0; i < 5; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            for (int i = 8; i < 10; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            records.put(partition, recordsForPartition);
    +
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(records));
    +
    +            for (int i = 0; i < recordsForPartition.size(); i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
    +
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +
    +            // all tuples emitted are pending commit
    +            assertEquals(recordsForPartition.size(), spout.getNumUncommittedOffsets());
    +
    +            // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    +            spout.nextTuple();
    +
    +            ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
    +            InOrder inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    +
    +            //verify that Offset 4 was last committed offset and 5 offsets were committed
    +            //the offset void should be bridged in the next commit
    +            assertTrue(committedOffsets.getValue().containsKey(partition));
    +            assertEquals(((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset(), 4);
    +            assertEquals(2, spout.getNumUncommittedOffsets());
    +
    +            //Trigger second kafka consumer commit
    +            reset(consumerMock);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    --- End diff --
    
    Nit: You can put generics on ConsumerRecords and use Collections.emptyMap() instead to avoid rawtypes warnings


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    This seems reasonable to me. Would be good to have a test too, you could probably fake log compaction by providing non-sequential records to a mocked KafkaConsumer, similar to how KafkaSpoutEmitTest does it. Either way +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r121218000
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.InOrder;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    +            for (int i = 0; i < 5; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            for (int i = 8; i < 10; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            records.put(partition, recordsForPartition);
    +
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(records));
    +
    +            for (int i = 0; i < recordsForPartition.size(); i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
    +
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +
    +            // all tuples emitted are pending commit
    +            assertEquals(recordsForPartition.size(), spout.getNumUncommittedOffsets());
    +
    +            // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    +            spout.nextTuple();
    +
    +            ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
    +            InOrder inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    +
    +            //verify that Offset 4 was last committed offset and 5 offsets were committed
    +            //the offset void should be bridged in the next commit
    +            assertTrue(committedOffsets.getValue().containsKey(partition));
    +            assertEquals(((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset(), 4);
    +            assertEquals(2, spout.getNumUncommittedOffsets());
    +
    +            //Trigger second kafka consumer commit
    +            reset(consumerMock);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            spout.nextTuple();
    +
    +            committedOffsets=ArgumentCaptor.forClass(Map.class);
    --- End diff --
    
    Nit: You can use the Mock annotation to put generic parameters on this. See https://static.javadoc.io/org.mockito/mockito-core/2.8.9/org/mockito/Mockito.html#mock_annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2152
  
    It's probably fine since there hasn't been a release containing STORM-2505. It would just be confusing if the same issue got fixed multiple times in https://github.com/apache/storm/blob/master/CHANGELOG.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r121218037
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.InOrder;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    +            for (int i = 0; i < 5; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            for (int i = 8; i < 10; i++) {
    +                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    +            }
    +            records.put(partition, recordsForPartition);
    +
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(records));
    +
    +            for (int i = 0; i < recordsForPartition.size(); i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
    +
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +
    +            // all tuples emitted are pending commit
    +            assertEquals(recordsForPartition.size(), spout.getNumUncommittedOffsets());
    +
    +            // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    +            spout.nextTuple();
    +
    +            ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
    +            InOrder inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    +
    +            //verify that Offset 4 was last committed offset and 5 offsets were committed
    +            //the offset void should be bridged in the next commit
    +            assertTrue(committedOffsets.getValue().containsKey(partition));
    +            assertEquals(((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset(), 4);
    +            assertEquals(2, spout.getNumUncommittedOffsets());
    +
    +            //Trigger second kafka consumer commit
    +            reset(consumerMock);
    +            when(consumerMock.poll(anyLong()))
    +                    .thenReturn(new ConsumerRecords(Collections.EMPTY_MAP));
    +            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
    +            spout.nextTuple();
    +
    +            committedOffsets=ArgumentCaptor.forClass(Map.class);
    +            inOrder = inOrder(consumerMock);
    +            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
    +            inOrder.verify(consumerMock).poll(anyLong());
    +
    +            //verify that Offset 9 was last committed offset and no offsets are uncommitted
    +            assertTrue(committedOffsets.getValue().containsKey(partition));
    +            assertEquals(9, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
    --- End diff --
    
    It would also allow you to avoid this cast


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r127538268
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.*;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.Captor;
    +import org.mockito.InOrder;
    +import org.mockito.MockitoAnnotations;
    +
    +public class KafkaSpoutCommitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    @Captor
    +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    +
    +    private void setupSpout(Set<TopicPartition> assignedPartitions) {
    +        MockitoAnnotations.initMocks(this);
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +                .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
    +    }
    +
    +    @Test
    +    public void testCommitSuccessWithOffsetVoids() {
    +        //Verify that the commit logic can handle offset voids
    +        try (SimulatedTime simulatedTime = new SimulatedTime()) {
    +            setupSpout(Collections.singleton(partition));
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
    +            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
    +            // Offsets emitted are 0,1,2,3,4,...,8,9
    --- End diff --
    
    Nit: This comment is a little confusing, the ... would usually imply that 5,6,7 are included. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2152#discussion_r121221199
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -104,6 +104,10 @@ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
             this.kafkaSpoutConfig = kafkaSpoutConfig;
         }
     
    +    long getNumUncommittedOffsets() {
    --- End diff --
    
    I'm wondering if we can avoid inspecting this directly in the test by unit testing OffsetManager instead of indirectly testing it via the KafkaSpout? Testing that the spout will commit past gaps in the tuple stream is a good idea, but the check of this field is actually testing that OffsetManager.commit isn't buggy, which I feel belongs in a test for that class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---