You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 23:25:09 UTC
[1/3] storm git commit: STORM-2296 Kafka spout no dup on leader
changes
Repository: storm
Updated Branches:
refs/heads/1.x-branch d9efb5ec2 -> 553eef1c4
STORM-2296 Kafka spout no dup on leader changes
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0763999
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0763999
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0763999
Branch: refs/heads/1.x-branch
Commit: b076399922cedf344c0851ca625e8cac20917cc8
Parents: 229c7db
Author: Ernestas Vaiciukevicius <e....@adform.com>
Authored: Thu Jan 12 16:54:59 2017 +0200
Committer: Ernestas Vaiciukevicius <e....@adform.com>
Committed: Fri Feb 3 21:59:30 2017 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/PartitionManager.java | 125 ++++++++++++-------
.../org/apache/storm/kafka/ZkCoordinator.java | 16 ++-
.../apache/storm/kafka/ZkCoordinatorTest.java | 42 ++++++-
3 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 79e7c3d..bc355ba 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -66,7 +66,29 @@ public class PartitionManager {
ZkState _state;
Map _stormConf;
long numberFailed, numberAcked;
- public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
+
+ public PartitionManager(
+ DynamicPartitionConnections connections,
+ String topologyInstanceId,
+ ZkState state,
+ Map stormConf,
+ SpoutConfig spoutConfig,
+ Partition id)
+ {
+ this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null);
+ }
+
+ /**
+ * @param previousManager previous partition manager if manager for partition is being recreated
+ */
+ public PartitionManager(
+ DynamicPartitionConnections connections,
+ String topologyInstanceId,
+ ZkState state,
+ Map stormConf,
+ SpoutConfig spoutConfig,
+ Partition id,
+ PartitionManager previousManager) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
@@ -76,53 +98,64 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
- try {
- _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
- FailedMsgRetryManager.class,
- spoutConfig.failedMsgRetryManagerClass), e);
- }
+ if (previousManager != null) {
+ _failedMsgRetryManager = previousManager._failedMsgRetryManager;
+ _committedTo = previousManager._committedTo;
+ _emittedToOffset = previousManager._emittedToOffset;
+ _waitingToEmit = previousManager._waitingToEmit;
+ _pending = previousManager._pending;
+ LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
+ _waitingToEmit.size(),
+ _pending.size());
+ } else {
+ try {
+ _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+ _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+ FailedMsgRetryManager.class,
+ spoutConfig.failedMsgRetryManagerClass), e);
+ }
- String jsonTopologyId = null;
- Long jsonOffset = null;
- String path = committedPath();
- try {
- Map<Object, Object> json = _state.readJSON(path);
- LOG.info("Read partition information from: " + path + " --> " + json );
- if (json != null) {
- jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
- jsonOffset = (Long) json.get("offset");
+ String jsonTopologyId = null;
+ Long jsonOffset = null;
+ String path = committedPath();
+ try {
+ Map<Object, Object> json = _state.readJSON(path);
+ LOG.info("Read partition information from: " + path + " --> " + json);
+ if (json != null) {
+ jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
+ jsonOffset = (Long) json.get("offset");
+ }
+ } catch (Throwable e) {
+ LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
- } catch (Throwable e) {
- LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
- }
- String topic = _partition.topic;
- Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
+ String topic = _partition.topic;
+ Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
- if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
- _committedTo = currentOffset;
- LOG.info("No partition information found, using configuration to determine offset");
- } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
- _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
- LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
- } else {
- _committedTo = jsonOffset;
- LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
- }
+ if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+ _committedTo = currentOffset;
+ LOG.info("No partition information found, using configuration to determine offset");
+ } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
+ _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
+ LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
+ } else {
+ _committedTo = jsonOffset;
+ LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId);
+ }
- if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
- LOG.info("Last commit offset from zookeeper: " + _committedTo);
- Long lastCommittedOffset = _committedTo;
- _committedTo = currentOffset;
- LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
- spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
- }
+ if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
+ LOG.info("Last commit offset from zookeeper: " + _committedTo);
+ Long lastCommittedOffset = _committedTo;
+ _committedTo = currentOffset;
+ LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
+ spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
+ }
- LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
- _emittedToOffset = _committedTo;
+ LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
+ _emittedToOffset = _committedTo;
+ }
_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
_fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
@@ -160,7 +193,7 @@ public class PartitionManager {
} else {
tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
}
-
+
if ((tups != null) && tups.iterator().hasNext()) {
if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
for (List<Object> tup : tups) {
@@ -201,7 +234,7 @@ public class PartitionManager {
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
-
+
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
@@ -214,7 +247,7 @@ public class PartitionManager {
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
-
+
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
@@ -223,7 +256,7 @@ public class PartitionManager {
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
-
+
return;
}
long end = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 98bf8a0..14be584 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -88,14 +88,24 @@ public class ZkCoordinator implements PartitionCoordinator {
LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
+ Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
- PartitionManager man = _managers.remove(id);
- man.close();
+ deletedManagers.put(id.partition, _managers.remove(id));
+ }
+ for (PartitionManager manager : deletedManagers.values()) {
+ if (manager != null) manager.close();
}
LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
- PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
+ PartitionManager man = new PartitionManager(
+ _connections,
+ _topologyInstanceId,
+ _state,
+ _stormConf,
+ _spoutConfig,
+ id,
+ deletedManagers.get(id.partition));
_managers.put(id, man);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 65bf0b4..adef740 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -28,8 +28,7 @@ import org.mockito.MockitoAnnotations;
import java.util.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.when;
@@ -106,7 +105,7 @@ public class ZkCoordinatorTest {
waitForRefresh();
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
- assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
+ assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
List<PartitionManager> partitionManagersAfter = iterator.next();
@@ -114,6 +113,43 @@ public class ZkCoordinatorTest {
}
}
+ @Test
+ public void testPartitionManagerRecreate() throws Exception {
+ final int totalTasks = 2;
+ int partitionsPerTask = 2;
+ List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
+ List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
+ waitForRefresh();
+ when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
+ List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
+ assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
+
+ HashMap<Integer, PartitionManager> managersAfterRefresh = new HashMap<Integer, PartitionManager>();
+ for (List<PartitionManager> partitionManagersAfter : partitionManagersAfterRefresh) {
+ for (PartitionManager manager : partitionManagersAfter) {
+ assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition));
+ managersAfterRefresh.put(manager.getPartition().partition, manager);
+ }
+ }
+
+ for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
+ for (PartitionManager manager : partitionManagersBefore) {
+ assertStateIsTheSame(manager, managersAfterRefresh.get(manager.getPartition().partition));
+ }
+ }
+ }
+
+ private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
+ // check if state was actually moved from old PartitionManager
+ assertNotNull(managerBefore);
+ assertNotNull(managerAfter);
+ assertNotSame(managerBefore, managerAfter);
+ assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
+ assertSame(managerBefore._emittedToOffset, managerAfter._emittedToOffset);
+ assertSame(managerBefore._committedTo, managerAfter._committedTo);
+ }
+
private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) {
assertEquals(partitionsPerTask, partitionManagersBefore.size());
assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());
[3/3] storm git commit: STORM-2296: CHANGELOG
Posted by ka...@apache.org.
STORM-2296: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/553eef1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/553eef1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/553eef1c
Branch: refs/heads/1.x-branch
Commit: 553eef1c48792cdfab540598fcdba17088a170e8
Parents: c9fb7a4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 4 08:24:54 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 4 08:24:54 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/553eef1c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a29c5e0..2e9af7b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2296: Kafka spout no dup on leader changes
* STORM-2244: Some shaded jars doesn't exclude dependency signature files
* STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
* STORM-2324: Fix deployment failure if resources directory is missing in topology jar
[2/3] storm git commit: Merge branch
'kafka_spout_no_dup_on_leader_changes_1_x' of https://github.com/ernisv/storm
into STORM-2296-1.x-merge
Posted by ka...@apache.org.
Merge branch 'kafka_spout_no_dup_on_leader_changes_1_x' of https://github.com/ernisv/storm into STORM-2296-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9fb7a4b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9fb7a4b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9fb7a4b
Branch: refs/heads/1.x-branch
Commit: c9fb7a4b9661cc92acaf4a9fc4284b61200d39bf
Parents: d9efb5e b076399
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 4 08:00:47 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 4 08:00:47 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/PartitionManager.java | 125 ++++++++++++-------
.../org/apache/storm/kafka/ZkCoordinator.java | 16 ++-
.../apache/storm/kafka/ZkCoordinatorTest.java | 42 ++++++-
3 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c9fb7a4b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c9fb7a4b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------