You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/07/06 16:03:25 UTC
[1/2] storm git commit: STORM-3090 - use topic together with
partition number during recreation of partition managers
Repository: storm
Updated Branches:
refs/heads/1.x-branch 9938ed357 -> 83e5788c0
STORM-3090 - use topic together with partition number during recreation of partition managers
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/35797bf1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/35797bf1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/35797bf1
Branch: refs/heads/1.x-branch
Commit: 35797bf1d2bc9dc9a382d2ab7078e6ef33237810
Parents: e694676
Author: Nikita Gorbachevsky <ni...@playtika.com>
Authored: Wed Jun 20 13:18:36 2018 +0300
Committer: Nikita Gorbachevsky <ni...@playtika.com>
Committed: Wed Jun 20 13:18:36 2018 +0300
----------------------------------------------------------------------
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 23 +++++++------
.../apache/storm/kafka/PartitionManager.java | 6 ++--
.../org/apache/storm/kafka/ZkCoordinator.java | 29 +++++++++-------
.../test/org/apache/storm/kafka/TestUtils.java | 14 ++++----
.../apache/storm/kafka/ZkCoordinatorTest.java | 36 ++++++++++++++++++++
5 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 5973932..4c307b8 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -162,15 +162,6 @@ public class KafkaSpout extends BaseRichSpout {
}
}
- private PartitionManager getManagerForPartition(int partition) {
- for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) {
- if (partitionManager.getPartition().partition == partition) {
- return partitionManager;
- }
- }
- return null;
- }
-
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
@@ -179,7 +170,7 @@ public class KafkaSpout extends BaseRichSpout {
m.ack(id.offset);
} else {
// managers for partitions changed - try to find new manager responsible for that partition
- PartitionManager newManager = getManagerForPartition(id.partition.partition);
+ PartitionManager newManager = tryToFindNewManager(id.partition);
if (newManager != null) {
newManager.ack(id.offset);
}
@@ -194,7 +185,7 @@ public class KafkaSpout extends BaseRichSpout {
m.fail(id.offset);
} else {
// managers for partitions changed - try to find new manager responsible for that partition
- PartitionManager newManager = getManagerForPartition(id.partition.partition);
+ PartitionManager newManager = tryToFindNewManager(id.partition);
if (newManager != null) {
newManager.fail(id.offset);
}
@@ -256,6 +247,16 @@ public class KafkaSpout extends BaseRichSpout {
return configuration;
}
+ private PartitionManager tryToFindNewManager(Partition partition) {
+ for (PartitionManager partitionManager : _coordinator.getMyManagedPartitions()) {
+ if (partitionManager.getPartition().partition == partition.partition
+ && partitionManager.getPartition().topic.equals(partition.topic)) {
+ return partitionManager;
+ }
+ }
+ return null;
+ }
+
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index c025e3b..54beb83 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -104,9 +104,9 @@ public class PartitionManager {
_emittedToOffset = previousManager._emittedToOffset;
_waitingToEmit = previousManager._waitingToEmit;
_pending = previousManager._pending;
- LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
- _waitingToEmit.size(),
- _pending.size());
+ LOG.info("Recreating PartitionManager based on previous manager, host: {}, topic: {}, partition: {}, "
+ + "_committedTo: {}, _emittedToOffset: {}, _waitingToEmit size: {}, _pending size: {} ",
+ id.host, id.topic, id.partition, _committedTo, _emittedToOffset, _waitingToEmit.size(), _pending.size());
} else {
try {
_failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index d9dbfb3..10bef8e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka;
+import kafka.common.TopicAndPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
@@ -33,8 +34,8 @@ public class ZkCoordinator implements PartitionCoordinator {
int _totalTasks;
int _taskId;
String _topologyInstanceId;
- Map<Partition, PartitionManager> _managers = new HashMap();
- List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
+ Map<Partition, PartitionManager> _managers = new HashMap<>();
+ List<PartitionManager> _cachedList = new ArrayList<>();
Long _lastRefreshTime = null;
int _refreshFreqMs;
DynamicPartitionConnections _connections;
@@ -82,22 +83,24 @@ public class ZkCoordinator implements PartitionCoordinator {
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
Set<Partition> curr = _managers.keySet();
- Set<Partition> newPartitions = new HashSet<Partition>(mine);
+ Set<Partition> newPartitions = new HashSet<>(mine);
newPartitions.removeAll(curr);
- Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
+ Set<Partition> deletedPartitions = new HashSet<>(curr);
deletedPartitions.removeAll(mine);
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
+ LOG.info("{} Deleted partition managers: {}",
+ taskPrefix(_taskIndex, _totalTasks, _taskId), deletedPartitions);
- Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
+ Map<TopicAndPartition, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
- deletedManagers.put(id.partition, _managers.remove(id));
+ PartitionManager manager = _managers.remove(id);
+ manager.close();
+ deletedManagers.put(new TopicAndPartition(id.topic, id.partition), manager);
}
- for (PartitionManager manager : deletedManagers.values()) {
- if (manager != null) manager.close();
- }
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
+
+ LOG.info("{} New partition managers: {}",
+ taskPrefix(_taskIndex, _totalTasks, _taskId), newPartitions);
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(
@@ -107,14 +110,14 @@ public class ZkCoordinator implements PartitionCoordinator {
_stormConf,
_spoutConfig,
id,
- deletedManagers.get(id.partition));
+ deletedManagers.get(new TopicAndPartition(id.topic, id.partition)));
_managers.put(id, man);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
- _cachedList = new ArrayList<PartitionManager>(_managers.values());
+ _cachedList = new ArrayList<>(_managers.values());
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
index cc3f2be..cfc6f41 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
@@ -40,20 +40,22 @@ public class TestUtils {
return buildPartitionInfo(numPartitions, 9092);
}
- public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation partitionInformation) {
- List<GlobalPartitionInformation> map = new ArrayList<GlobalPartitionInformation>();
- map.add(partitionInformation);
- return map;
+ public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation... partitionInformation) {
+ return new ArrayList<>(Arrays.asList(partitionInformation));
}
- public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
- GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC);
+ public static GlobalPartitionInformation buildPartitionInfo(String topic, int numPartitions, int brokerPort) {
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic);
for (int i = 0; i < numPartitions; i++) {
globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
}
return globalPartitionInformation;
}
+ public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
+ return buildPartitionInfo(TOPIC, numPartitions, brokerPort);
+ }
+
public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
BrokerHosts brokerHosts = getBrokerHosts(broker);
KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
http://git-wip-us.apache.org/repos/asf/storm/blob/35797bf1/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 0b86845..64b9a9b 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -140,6 +140,42 @@ public class ZkCoordinatorTest {
}
}
+ @Test
+ public void testPartitionManagerRecreateMultipleTopics() throws Exception {
+ List<ZkCoordinator> coordinatorList = buildCoordinators(1);
+
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9092),
+ TestUtils.buildPartitionInfo("topic2", 1, 9092)));
+
+ List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions();
+ assertEquals(2, partitionManagersBeforeRefresh.size());
+ for (PartitionManager partitionManager : partitionManagersBeforeRefresh) {
+ partitionManager._emittedToOffset = 100L;
+ partitionManager._committedTo = 100L;
+ }
+
+ waitForRefresh();
+
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9093),
+ TestUtils.buildPartitionInfo("topic3", 1, 9093)));
+
+ List<PartitionManager> partitionManagersAfterRefresh = coordinatorList.get(0).getMyManagedPartitions();
+ assertEquals(2, partitionManagersAfterRefresh.size());
+ for (PartitionManager partitionManager : partitionManagersAfterRefresh) {
+ if (partitionManager.getPartition().topic.equals("topic1")) {
+ assertEquals(100, partitionManager._emittedToOffset.longValue());
+ assertEquals(100, partitionManager._committedTo.longValue());
+ }
+ if (partitionManager.getPartition().topic.equals("topic3")) {
+ // NO_OFFSET from KafkaUtils
+ assertEquals(-5, partitionManager._emittedToOffset.longValue());
+ assertEquals(-5, partitionManager._committedTo.longValue());
+ }
+ }
+ }
+
private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
// check if state was actually moved from old PartitionManager
assertNotNull(managerBefore);
[2/2] storm git commit: Merge branch 'STORM-3090' of
https://github.com/choojoyq/storm into asfgit-1.x-branch
Posted by sr...@apache.org.
Merge branch 'STORM-3090' of https://github.com/choojoyq/storm into asfgit-1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83e5788c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83e5788c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83e5788c
Branch: refs/heads/1.x-branch
Commit: 83e5788c06863cbf4ad336dbfb142aa237b0cba0
Parents: 9938ed3 35797bf
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jul 6 17:42:20 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Jul 6 17:42:20 2018 +0200
----------------------------------------------------------------------
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 23 +++++++------
.../apache/storm/kafka/PartitionManager.java | 6 ++--
.../org/apache/storm/kafka/ZkCoordinator.java | 29 +++++++++-------
.../test/org/apache/storm/kafka/TestUtils.java | 14 ++++----
.../apache/storm/kafka/ZkCoordinatorTest.java | 36 ++++++++++++++++++++
5 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------