You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/07/06 16:03:25 UTC

[1/2] storm git commit: STORM-3090 - use topic together with partition number during recreation of partition managers

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 9938ed357 -> 83e5788c0


STORM-3090 - use topic together with partition number during recreation of partition managers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/35797bf1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/35797bf1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/35797bf1

Branch: refs/heads/1.x-branch
Commit: 35797bf1d2bc9dc9a382d2ab7078e6ef33237810
Parents: e694676
Author: Nikita Gorbachevsky <ni...@playtika.com>
Authored: Wed Jun 20 13:18:36 2018 +0300
Committer: Nikita Gorbachevsky <ni...@playtika.com>
Committed: Wed Jun 20 13:18:36 2018 +0300

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  | 23 +++++++------
 .../apache/storm/kafka/PartitionManager.java    |  6 ++--
 .../org/apache/storm/kafka/ZkCoordinator.java   | 29 +++++++++-------
 .../test/org/apache/storm/kafka/TestUtils.java  | 14 ++++----
 .../apache/storm/kafka/ZkCoordinatorTest.java   | 36 ++++++++++++++++++++
 5 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 5973932..4c307b8 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -162,15 +162,6 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
-    private PartitionManager getManagerForPartition(int partition) {
-        for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) {
-            if (partitionManager.getPartition().partition == partition) {
-                return partitionManager;
-            }
-        }
-        return null;
-    }
-
     @Override
     public void ack(Object msgId) {
         KafkaMessageId id = (KafkaMessageId) msgId;
@@ -179,7 +170,7 @@ public class KafkaSpout extends BaseRichSpout {
             m.ack(id.offset);
         } else {
             // managers for partitions changed - try to find new manager responsible for that partition
-            PartitionManager newManager = getManagerForPartition(id.partition.partition);
+            PartitionManager newManager = tryToFindNewManager(id.partition);
             if (newManager != null) {
                 newManager.ack(id.offset);
             }
@@ -194,7 +185,7 @@ public class KafkaSpout extends BaseRichSpout {
             m.fail(id.offset);
         } else {
             // managers for partitions changed - try to find new manager responsible for that partition
-            PartitionManager newManager = getManagerForPartition(id.partition.partition);
+            PartitionManager newManager = tryToFindNewManager(id.partition);
             if (newManager != null) {
                 newManager.fail(id.offset);
             }
@@ -256,6 +247,16 @@ public class KafkaSpout extends BaseRichSpout {
         return configuration;
     }
 
+    private PartitionManager tryToFindNewManager(Partition partition) {
+        for (PartitionManager partitionManager : _coordinator.getMyManagedPartitions()) {
+            if (partitionManager.getPartition().partition == partition.partition
+                && partitionManager.getPartition().topic.equals(partition.topic)) {
+                return partitionManager;
+            }
+        }
+        return null;
+    }
+
     private void commit() {
         _lastUpdateMs = System.currentTimeMillis();
         for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index c025e3b..54beb83 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -104,9 +104,9 @@ public class PartitionManager {
             _emittedToOffset = previousManager._emittedToOffset;
             _waitingToEmit = previousManager._waitingToEmit;
             _pending = previousManager._pending;
-            LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
-                    _waitingToEmit.size(),
-                    _pending.size());
+            LOG.info("Recreating PartitionManager based on previous manager, host: {}, topic: {}, partition: {}, "
+                    + "_committedTo: {}, _emittedToOffset: {}, _waitingToEmit size: {}, _pending size: {} ",
+                id.host, id.topic, id.partition, _committedTo, _emittedToOffset, _waitingToEmit.size(), _pending.size());
         } else {
             try {
                 _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();

http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index d9dbfb3..10bef8e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka;
 
+import kafka.common.TopicAndPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.storm.kafka.trident.GlobalPartitionInformation;
@@ -33,8 +34,8 @@ public class ZkCoordinator implements PartitionCoordinator {
     int _totalTasks;
     int _taskId;
     String _topologyInstanceId;
-    Map<Partition, PartitionManager> _managers = new HashMap();
-    List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
+    Map<Partition, PartitionManager> _managers = new HashMap<>();
+    List<PartitionManager> _cachedList = new ArrayList<>();
     Long _lastRefreshTime = null;
     int _refreshFreqMs;
     DynamicPartitionConnections _connections;
@@ -82,22 +83,24 @@ public class ZkCoordinator implements PartitionCoordinator {
             List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
 
             Set<Partition> curr = _managers.keySet();
-            Set<Partition> newPartitions = new HashSet<Partition>(mine);
+            Set<Partition> newPartitions = new HashSet<>(mine);
             newPartitions.removeAll(curr);
 
-            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
+            Set<Partition> deletedPartitions = new HashSet<>(curr);
             deletedPartitions.removeAll(mine);
 
-            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
+            LOG.info("{} Deleted partition managers: {}",
+                taskPrefix(_taskIndex, _totalTasks, _taskId), deletedPartitions);
 
-            Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
+            Map<TopicAndPartition, PartitionManager> deletedManagers = new HashMap<>();
             for (Partition id : deletedPartitions) {
-                deletedManagers.put(id.partition, _managers.remove(id));
+                PartitionManager manager = _managers.remove(id);
+                manager.close();
+                deletedManagers.put(new TopicAndPartition(id.topic, id.partition), manager);
             }
-            for (PartitionManager manager : deletedManagers.values()) {
-                if (manager != null) manager.close();
-            }
-            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
+
+            LOG.info("{} New partition managers: {}",
+                taskPrefix(_taskIndex, _totalTasks, _taskId), newPartitions);
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(
@@ -107,14 +110,14 @@ public class ZkCoordinator implements PartitionCoordinator {
                         _stormConf,
                         _spoutConfig,
                         id,
-                        deletedManagers.get(id.partition));
+                        deletedManagers.get(new TopicAndPartition(id.topic, id.partition)));
                 _managers.put(id, man);
             }
 
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        _cachedList = new ArrayList<PartitionManager>(_managers.values());
+        _cachedList = new ArrayList<>(_managers.values());
         LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
index cc3f2be..cfc6f41 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
@@ -40,20 +40,22 @@ public class TestUtils {
         return buildPartitionInfo(numPartitions, 9092);
     }
 
-    public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation partitionInformation) {
-        List<GlobalPartitionInformation> map = new ArrayList<GlobalPartitionInformation>();
-        map.add(partitionInformation);
-        return map;
+    public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation... partitionInformation) {
+        return new ArrayList<>(Arrays.asList(partitionInformation));
     }
 
-    public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
-        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC);
+    public static GlobalPartitionInformation buildPartitionInfo(String topic, int numPartitions, int brokerPort) {
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic);
         for (int i = 0; i < numPartitions; i++) {
             globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
         }
         return globalPartitionInformation;
     }
 
+    public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
+        return buildPartitionInfo(TOPIC, numPartitions, brokerPort);
+    }
+
     public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
         BrokerHosts brokerHosts = getBrokerHosts(broker);
         KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);

http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 0b86845..64b9a9b 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -140,6 +140,42 @@ public class ZkCoordinatorTest {
         }
     }
 
+    @Test
+    public void testPartitionManagerRecreateMultipleTopics() throws Exception {
+        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
+
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+            TestUtils.buildPartitionInfo("topic1", 1, 9092),
+            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
+
+        List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions();
+        assertEquals(2, partitionManagersBeforeRefresh.size());
+        for (PartitionManager partitionManager : partitionManagersBeforeRefresh) {
+            partitionManager._emittedToOffset = 100L;
+            partitionManager._committedTo = 100L;
+        }
+
+        waitForRefresh();
+
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+            TestUtils.buildPartitionInfo("topic1", 1, 9093),
+            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
+
+        List<PartitionManager> partitionManagersAfterRefresh = coordinatorList.get(0).getMyManagedPartitions();
+        assertEquals(2, partitionManagersAfterRefresh.size());
+        for (PartitionManager partitionManager : partitionManagersAfterRefresh) {
+            if (partitionManager.getPartition().topic.equals("topic1")) {
+                assertEquals(100, partitionManager._emittedToOffset.longValue());
+                assertEquals(100, partitionManager._committedTo.longValue());
+            }
+            if (partitionManager.getPartition().topic.equals("topic3")) {
+                // NO_OFFSET from KafkaUtils
+                assertEquals(-5, partitionManager._emittedToOffset.longValue());
+                assertEquals(-5, partitionManager._committedTo.longValue());
+            }
+        }
+    }
+
     private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
         // check if state was actually moved from old PartitionManager
         assertNotNull(managerBefore);


[2/2] storm git commit: Merge branch 'STORM-3090' of https://github.com/choojoyq/storm into asfgit-1.x-branch

Posted by sr...@apache.org.
Merge branch 'STORM-3090' of https://github.com/choojoyq/storm into asfgit-1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83e5788c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83e5788c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83e5788c

Branch: refs/heads/1.x-branch
Commit: 83e5788c06863cbf4ad336dbfb142aa237b0cba0
Parents: 9938ed3 35797bf
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jul 6 17:42:20 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Jul 6 17:42:20 2018 +0200

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  | 23 +++++++------
 .../apache/storm/kafka/PartitionManager.java    |  6 ++--
 .../org/apache/storm/kafka/ZkCoordinator.java   | 29 +++++++++-------
 .../test/org/apache/storm/kafka/TestUtils.java  | 14 ++++----
 .../apache/storm/kafka/ZkCoordinatorTest.java   | 36 ++++++++++++++++++++
 5 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------