You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by ernisv <gi...@git.apache.org> on 2017/01/19 11:25:49 UTC

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes 1 ...

GitHub user ernisv opened a pull request:

    https://github.com/apache/storm/pull/1888

    STORM-2296 Kafka spout no dup on leader changes 1 x

    Current behavior of Kafka spout emits duplicate tuples whenever Kafka topic leader's change.
    In case of exception caused by leader changes, PartitionManagers are simply recreated losing the state about which tuples were already emitted and new PartitionManager re-emits them again.
    
    This is fine as at-least-once is fulfilled, but still it would be better to not emit duplicate data if possible.
    Moreover this could be easily avoided by moving the state related to emitted tuples from old PartitionManager to new one.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ernisv/storm kafka_spout_no_dup_on_leader_changes_1_x

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1888
    
----
commit fb9c3073f5babc35828abdcf897db31846cabecc
Author: Ernestas Vaiciukevicius <e....@adform.com>
Date:   2017-01-12T14:54:59Z

    Move state from old PartitionManager when recreating manager for same partition

commit aefd80a5404b726d1dd538018b4f7f0bca119627
Author: Ernestas Vaiciukevicius <e....@adform.com>
Date:   2017-01-12T15:39:51Z

    Test to check if old PartitionManager's state is moved to new manager during manager recreate

commit c744e4b7dcbca5082243a691d97f12cd4b1151c3
Author: Ernestas Vaiciukevicius <e....@adform.com>
Date:   2017-01-12T15:57:46Z

    Include _emittedToOffset when copying state during PartitionManager recreate

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99283845
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    --- End diff --
    
    It compares exactly same thing. Did you miss here, or it's redundant line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by ernisv <gi...@git.apache.org>.
Github user ernisv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99294012
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    --- End diff --
    
    Right, there's the same error in another test that got copy-pasted here :)
    Fixed in both places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1888: STORM-2296 Kafka spout no dup on leader changes, 1.x-bran...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1888
  
    @ernisv 
    Looks good. Minor nits. Build failing is unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1888: STORM-2296 Kafka spout no dup on leader changes, 1.x-bran...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1888
  
    @ernisv 
    Could you squash your commits into one? 
    `STORM-2296 Kafka spout no dup on leader changes` would be sufficient to new commit title, and you can describe current commits' titles to the body.
    
    I can handle it if you mind, but then you need to close this PR manually after merging, since Github auto-close doesn't work against non-master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1888


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by ernisv <gi...@git.apache.org>.
Github user ernisv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99294074
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    +        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
    +        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
    +            List<PartitionManager> partitionManagersAfter = iterator.next();
    +            for (PartitionManager before : partitionManagersBefore)
    --- End diff --
    
    Right, simplified the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99284800
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    +        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
    +        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
    +            List<PartitionManager> partitionManagersAfter = iterator.next();
    +            for (PartitionManager before : partitionManagersBefore)
    --- End diff --
    
    I'd rather create a new hash map for before or after which key is partition, and compare based on the map and other list. The depth of `if` statement is already 4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1888: STORM-2296 Kafka spout no dup on leader changes, 1.x-bran...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1888
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99298185
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    +        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
    +        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
    +            List<PartitionManager> partitionManagersAfter = iterator.next();
    +            for (PartitionManager before : partitionManagersBefore)
    +                for (PartitionManager after: partitionManagersAfter)
    +                    if (before.getPartition().partition == after.getPartition().partition)
    +                        assertStateIsTheSame(before, after);
    +        }
    +    }
    +
    +    private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
    +        // check if state was actually moved from old PartitionManager
    +        assertNotSame(managerBefore, managerAfter);
    +        assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
    --- End diff --
    
    No I think that's enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99284895
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    +        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
    +        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
    +            List<PartitionManager> partitionManagersAfter = iterator.next();
    +            for (PartitionManager before : partitionManagersBefore)
    +                for (PartitionManager after: partitionManagersAfter)
    +                    if (before.getPartition().partition == after.getPartition().partition)
    +                        assertStateIsTheSame(before, after);
    +        }
    +    }
    +
    +    private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
    +        // check if state was actually moved from old PartitionManager
    +        assertNotSame(managerBefore, managerAfter);
    +        assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
    --- End diff --
    
    Let's check all the copying fields, not only _waitingToEmit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

Posted by ernisv <gi...@git.apache.org>.
Github user ernisv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1888#discussion_r99295805
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreate() throws Exception {
    +        final int totalTasks = 2;
    +        int partitionsPerTask = 2;
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
    +        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
    +        waitForRefresh();
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
    +        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
    +        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
    +        Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
    +        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
    +            List<PartitionManager> partitionManagersAfter = iterator.next();
    +            for (PartitionManager before : partitionManagersBefore)
    +                for (PartitionManager after: partitionManagersAfter)
    +                    if (before.getPartition().partition == after.getPartition().partition)
    +                        assertStateIsTheSame(before, after);
    +        }
    +    }
    +
    +    private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
    +        // check if state was actually moved from old PartitionManager
    +        assertNotSame(managerBefore, managerAfter);
    +        assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
    --- End diff --
    
    I've added checks for _emittedToOffset and _committedTo fields.
    However _failedMsgRetryManager and _pending are private, so we either do not check them or  increase the fields visibility to "package". Do you think it warrants visibility change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1888: STORM-2296 Kafka spout no dup on leader changes, 1.x-bran...

Posted by ernisv <gi...@git.apache.org>.
Github user ernisv commented on the issue:

    https://github.com/apache/storm/pull/1888
  
    Ok, I've squashed all the changes to single commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---