You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by choojoyq <gi...@git.apache.org> on 2018/06/19 14:27:55 UTC

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

GitHub user choojoyq opened a pull request:

    https://github.com/apache/storm/pull/2726

    STORM-3090 - Fix bug when different topics use the same offset for a partition

    In the current implementation of `ZkCoordinator` deleted partition managers are used as state holders for newly created partition managers. This behavour was introduced in the scope of [this](https://issues-test.apache.org/jira/browse/STORM-2296) ticket. However existing lookup is based on only on partition number.
    
    ```
    Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
    for (Partition id : deletedPartitions) {
        deletedManagers.put(id.partition, _managers.remove(id));
    }
    for (PartitionManager manager : deletedManagers.values()) {
        if (manager != null) manager.close();
    }
    LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
    
    for (Partition id : newPartitions) {
        PartitionManager man = new PartitionManager(
            _connections,
            _topologyInstanceId,
            _state,
            _topoConf,
            _spoutConfig,
            id,
            deletedManagers.get(id.partition));
        _managers.put(id, man);
    ```
    Which is definitely incorrect as the same task is able to manage multiple partitions with the same number but for different topics. In this case all new partition managers obtain the same offset value from a random deleted partition manager (as `HashMap` is used). And all fetch requests for the new partition managers fail with `TopicOffsetOutOfRangeException`. Some of them are recovered via this logic if assigned offset is smaller than the real one, but other continue to repetitively fail with offset out of range exception preventing fetching messages from Kafka.
    ```
    if (offset > _emittedToOffset) {
        _lostMessageCount.incrBy(offset - _emittedToOffset);
        _emittedToOffset = offset;
        LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
    }
    ```
    I assume that state holder lookup should be based both on topic and partition number.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/choojoyq/storm STORM-3090

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2726.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2726
    
----
commit 808e6be3edc7f83c80ad2ad4fb0a11091e81cc66
Author: Nikita Gorbachevsky <ni...@...>
Date:   2018-06-19T14:02:28Z

    STORM-3090 - use topic together with partition number during recreation of partition managers

----


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2726


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @HeartSaVioR 
    Thanks for the fast reply. I pointed this pull request to 1.x-branch. 
    Does it make sense to open one more pull request for 1.0.x-branch as well?


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200657323
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreateMultipleTopics() throws Exception {
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9092),
    +            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
    +
    +        List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions();
    +        assertEquals(2, partitionManagersBeforeRefresh.size());
    +        for (PartitionManager partitionManager : partitionManagersBeforeRefresh) {
    +            partitionManager._emittedToOffset = 100L;
    +            partitionManager._committedTo = 100L;
    +        }
    +
    +        waitForRefresh();
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9093),
    +            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
    --- End diff --
    
    Thanks, it makes sense. Since there's only one task, topic 2 can't be removed though. It's not really important, the test makes sense regardless.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    Is this ready to merge @choojoyq?


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200651516
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---
    @@ -107,14 +110,14 @@ public void refresh() {
                             _stormConf,
                             _spoutConfig,
                             id,
    -                        deletedManagers.get(id.partition));
    +                        deletedManagers.get(new TopicAndPartition(id.topic, id.partition)));
    --- End diff --
    
    I believe that this lookup exists for preserving information about partitions from other brokers in case of rebalancing. If for example one of brokers fail and its partitions are reassigned to other brokers this lookup should find information about this partition and reuse it during recreation of the same partition but on other host. ``equals()`` and ``hashCode()`` in Partition are based on topic, partition and host while this lookup in based only on topic and partition.


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200653436
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---
    @@ -107,14 +110,14 @@ public void refresh() {
                             _stormConf,
                             _spoutConfig,
                             id,
    -                        deletedManagers.get(id.partition));
    +                        deletedManagers.get(new TopicAndPartition(id.topic, id.partition)));
    --- End diff --
    
    Good point, never mind then.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @choojoyq
    First of all, thanks for your contribution. We are deprecating storm-kafka from 1.x version and plan to remove this entirely in 2.0 (there're couple of pull requests to be reviewed before that, though). 
    
    So you may need to submit this PR against 1.x-branch.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @HeartSaVioR 
    Hi, do you probably know release date of the next storm version where this pull request potentially will be merged? We are facing this issue pretty often in our cluster which requires restarting of storm topology with ``ignoreZkOffsets`` flag, so we consider temporary solution with releasing of our own version of ``storm-kafka`` module if the next official version will not be released soon. Thanks.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @srdo i think so, thanks for the review


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200655178
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreateMultipleTopics() throws Exception {
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9092),
    +            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
    +
    +        List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions();
    +        assertEquals(2, partitionManagersBeforeRefresh.size());
    +        for (PartitionManager partitionManager : partitionManagersBeforeRefresh) {
    +            partitionManager._emittedToOffset = 100L;
    +            partitionManager._committedTo = 100L;
    +        }
    +
    +        waitForRefresh();
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9093),
    +            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
    --- End diff --
    
    The idea of the test that before rebalancing some ``Task`` was responsible for 2 partitions from topics 1 and 2 from broker 9092, but after rebalancing of Kafka brokers it became responsible for the same partition for topic 1 and another partition from topic 3 which are hosted now on broker 9093. So ``Task`` should preserve partition information about topic1 as it was managed by this task before and it knows how many messages was already emitted, committed, etc. While partition from topic3 is a brand new and so it should be created from scratch without reusing of any information. Before the fix information for topic3 would be reused from topic1 as lookup of previously managed partitions was performed only on partition number ignoring topic.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @choojoyq Let us port back first if it is OK to be merged.


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by choojoyq <gi...@git.apache.org>.
Github user choojoyq commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    @HeartSaVioR Sure, thanks.


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200456703
  
    --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreateMultipleTopics() throws Exception {
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9092),
    +            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
    +
    +        List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions();
    +        assertEquals(2, partitionManagersBeforeRefresh.size());
    +        for (PartitionManager partitionManager : partitionManagersBeforeRefresh) {
    +            partitionManager._emittedToOffset = 100L;
    +            partitionManager._committedTo = 100L;
    +        }
    +
    +        waitForRefresh();
    +
    +        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9093),
    +            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
    --- End diff --
    
    Nit: Seems weird to exclude topic 2, that can't happen in a real setup. Shouldn't you be checking that topic1 and topic2 can't overwrite each others' offsets instead (e.g by making them different a few lines up)?


---

[GitHub] storm issue #2726: STORM-3090 - Fix bug when different topics use the same o...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2726
  
    The changes look good to me, +1. Left some comments, but I don't think it makes sense to put too much effort into making the tests more realistic, or refactoring, given that storm-kafka is being removed.


---

[GitHub] storm pull request #2726: STORM-3090 - Fix bug when different topics use the...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200456835
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---
    @@ -107,14 +110,14 @@ public void refresh() {
                             _stormConf,
                             _spoutConfig,
                             id,
    -                        deletedManagers.get(id.partition));
    +                        deletedManagers.get(new TopicAndPartition(id.topic, id.partition)));
    --- End diff --
    
    Are you sure this lookup makes sense? The deletedManagers only contains partitions that this task is no longer responsible for (curr - mine). That partition shouldn't be able to also be in newPartitions (mine - curr). Won't this always return null (unless you're hitting the bug that this PR is trying to fix, namely that offsets are being copied from the PartitionManager for e.g. topic1 to the PartitionManager for topic2)?


---