You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Jing Chen (JIRA)" <ji...@apache.org> on 2018/09/07 09:19:00 UTC

[jira] [Commented] (STORM-3214) 使用 kafka.topic.wildcard.match =true的时候,ZkCoordinator.refresh中deletedManagers会出现逻辑错误

    [ https://issues.apache.org/jira/browse/STORM-3214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606876#comment-16606876 ] 

Jing Chen commented on STORM-3214:
----------------------------------

[~sj] thanks for the catch, I believe it has been fixed at 1.1.x-branch, please have a look at:

[https://github.com/apache/storm/blob/1.1.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java#L93]

Also, storm-kafka has been deprecated and will be removed in a future storm release after 1.x version. Please upgrade to storm-kafka-client.

FYI, [https://github.com/apache/storm/tree/master/external/storm-kafka-client]

 

> 使用 kafka.topic.wildcard.match =true的时候,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
> -----------------------------------------------------------------------------------
>
>                 Key: STORM-3214
>                 URL: https://issues.apache.org/jira/browse/STORM-3214
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.1.3, 1.2.2
>            Reporter: shusj
>            Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> 使用 kafka.topic.wildcard.match =true的时候,如果topic数目大于1,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
> 只需要将ZkCoordinator@L91:    Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
> 将key修改为topic+partition
>  
> 在org.apache.storm.kafka.ZkCoordinatorTest中添加了如下测试
> {code:java}
> //代码占位符
> public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort, String topic) {
> GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic);
> for (int i = 0; i < numPartitions; i++) {
> globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
> }
> return globalPartitionInformation;
> }
> @Test
> public void testTwoTopicPartitionsChange() throws Exception {
> int numPartitions = 2;
> int partitionsPerTask = 1;
> final Set<Partition> unregisterList = new HashSet<>();
> Mockito.doAnswer(new Answer() {
> @Override
> public Object answer(InvocationOnMock invocation) throws Throwable {
> Object[] arguments = invocation.getArguments();
> Partition partition = new Partition((Broker) arguments[0], (String) arguments[1], (int) arguments[2], false);
> unregisterList.add(partition);
> return null;
> }
> }).when(dynamicPartitionConnections).unregister(any(Broker.class), any(String.class), anyInt());
> List<ZkCoordinator> coordinatorList = buildCoordinators(partitionsPerTask);
> ArrayList<GlobalPartitionInformation> prePartitionInformations = Lists.newArrayList(buildPartitionInfo(numPartitions, 9092, "TOPIC1"), 
> buildPartitionInfo(numPartitions, 9092, "TOPIC2"));
> when(reader.getBrokerInfo()).thenReturn(prePartitionInformations);
> List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
> waitForRefresh();
> when(reader.getBrokerInfo()).thenReturn(Lists.newArrayList(buildPartitionInfo(numPartitions, 9093, "TOPIC1"), buildPartitionInfo(numPartitions, 9093, "TOPIC2")));
> List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
> List<Partition> allPrePartition = KafkaUtils.calculatePartitionsForTask(prePartitionInformations, 1, 0, 0);
> assertEquals(unregisterList.size(), allPrePartition.size());
> for (Partition partition : allPrePartition) {
> assertTrue(unregisterList.contains(partition));
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)