You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 23:25:09 UTC

[1/3] storm git commit: STORM-2296 Kafka spout no dup on leader changes

Repository: storm
Updated Branches:
  refs/heads/1.x-branch d9efb5ec2 -> 553eef1c4


STORM-2296 Kafka spout no dup on leader changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0763999
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0763999
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0763999

Branch: refs/heads/1.x-branch
Commit: b076399922cedf344c0851ca625e8cac20917cc8
Parents: 229c7db
Author: Ernestas Vaiciukevicius <e....@adform.com>
Authored: Thu Jan 12 16:54:59 2017 +0200
Committer: Ernestas Vaiciukevicius <e....@adform.com>
Committed: Fri Feb 3 21:59:30 2017 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/PartitionManager.java    | 125 ++++++++++++-------
 .../org/apache/storm/kafka/ZkCoordinator.java   |  16 ++-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  42 ++++++-
 3 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 79e7c3d..bc355ba 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -66,7 +66,29 @@ public class PartitionManager {
     ZkState _state;
     Map _stormConf;
     long numberFailed, numberAcked;
-    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
+
+    public PartitionManager(
+            DynamicPartitionConnections connections,
+            String topologyInstanceId,
+            ZkState state,
+            Map stormConf,
+            SpoutConfig spoutConfig,
+            Partition id)
+    {
+        this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null);
+    }
+
+    /**
+     * @param previousManager previous partition manager if manager for partition is being recreated
+     */
+    public PartitionManager(
+            DynamicPartitionConnections connections,
+            String topologyInstanceId,
+            ZkState state,
+            Map stormConf,
+            SpoutConfig spoutConfig,
+            Partition id,
+            PartitionManager previousManager) {
         _partition = id;
         _connections = connections;
         _spoutConfig = spoutConfig;
@@ -76,53 +98,64 @@ public class PartitionManager {
         _stormConf = stormConf;
         numberAcked = numberFailed = 0;
 
-        try {
-            _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
-            _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
-        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-            throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
-                                                             FailedMsgRetryManager.class,
-                                                             spoutConfig.failedMsgRetryManagerClass), e);
-        }
+        if (previousManager != null) {
+            _failedMsgRetryManager = previousManager._failedMsgRetryManager;
+            _committedTo = previousManager._committedTo;
+            _emittedToOffset = previousManager._emittedToOffset;
+            _waitingToEmit = previousManager._waitingToEmit;
+            _pending = previousManager._pending;
+            LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
+                    _waitingToEmit.size(),
+                    _pending.size());
+        } else {
+            try {
+                _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+                _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
+            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+                throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+                        FailedMsgRetryManager.class,
+                        spoutConfig.failedMsgRetryManagerClass), e);
+            }
 
-        String jsonTopologyId = null;
-        Long jsonOffset = null;
-        String path = committedPath();
-        try {
-            Map<Object, Object> json = _state.readJSON(path);
-            LOG.info("Read partition information from: " + path +  "  --> " + json );
-            if (json != null) {
-                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
-                jsonOffset = (Long) json.get("offset");
+            String jsonTopologyId = null;
+            Long jsonOffset = null;
+            String path = committedPath();
+            try {
+                Map<Object, Object> json = _state.readJSON(path);
+                LOG.info("Read partition information from: " + path + "  --> " + json);
+                if (json != null) {
+                    jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
+                    jsonOffset = (Long) json.get("offset");
+                }
+            } catch (Throwable e) {
+                LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
             }
-        } catch (Throwable e) {
-            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
-        }
 
-        String topic = _partition.topic;
-        Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
+            String topic = _partition.topic;
+            Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
 
-        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-            _committedTo = currentOffset;
-            LOG.info("No partition information found, using configuration to determine offset");
-        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
-            _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
-            LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
-        } else {
-            _committedTo = jsonOffset;
-            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
-        }
+            if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+                _committedTo = currentOffset;
+                LOG.info("No partition information found, using configuration to determine offset");
+            } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
+                _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
+                LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
+            } else {
+                _committedTo = jsonOffset;
+                LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId);
+            }
 
-        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
-            LOG.info("Last commit offset from zookeeper: " + _committedTo);
-            Long lastCommittedOffset = _committedTo;
-            _committedTo = currentOffset;
-            LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
-                    spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
-        }
+            if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
+                LOG.info("Last commit offset from zookeeper: " + _committedTo);
+                Long lastCommittedOffset = _committedTo;
+                _committedTo = currentOffset;
+                LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
+                        spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
+            }
 
-        LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
-        _emittedToOffset = _committedTo;
+            LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
+            _emittedToOffset = _committedTo;
+        }
 
         _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
         _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
@@ -160,7 +193,7 @@ public class PartitionManager {
             } else {
                 tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
             }
-            
+
             if ((tups != null) && tups.iterator().hasNext()) {
                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
@@ -201,7 +234,7 @@ public class PartitionManager {
         } catch (TopicOffsetOutOfRangeException e) {
             offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
             // fetch failed, so don't update the fetch metrics
-            
+
             //fix bug [STORM-643] : remove outdated failed offsets
             if (!processingNewTuples) {
                 // For the case of EarliestTime it would be better to discard
@@ -214,7 +247,7 @@ public class PartitionManager {
                 if (null != omitted) {
                     _lostMessageCount.incrBy(omitted.size());
                 }
-                
+
                 LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
             }
 
@@ -223,7 +256,7 @@ public class PartitionManager {
                 _emittedToOffset = offset;
                 LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
             }
-            
+
             return;
         }
         long end = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 98bf8a0..14be584 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -88,14 +88,24 @@ public class ZkCoordinator implements PartitionCoordinator {
 
             LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
 
+            Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
             for (Partition id : deletedPartitions) {
-                PartitionManager man = _managers.remove(id);
-                man.close();
+                deletedManagers.put(id.partition, _managers.remove(id));
+            }
+            for (PartitionManager manager : deletedManagers.values()) {
+                if (manager != null) manager.close();
             }
             LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
 
             for (Partition id : newPartitions) {
-                PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
+                PartitionManager man = new PartitionManager(
+                        _connections,
+                        _topologyInstanceId,
+                        _state,
+                        _stormConf,
+                        _spoutConfig,
+                        id,
+                        deletedManagers.get(id.partition));
                 _managers.put(id, man);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b0763999/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 65bf0b4..adef740 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -28,8 +28,7 @@ import org.mockito.MockitoAnnotations;
 
 import java.util.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.when;
@@ -106,7 +105,7 @@ public class ZkCoordinatorTest {
         waitForRefresh();
         when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
         List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
-        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
+        assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
         Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
         for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
             List<PartitionManager> partitionManagersAfter = iterator.next();
@@ -114,6 +113,43 @@ public class ZkCoordinatorTest {
         }
     }
 
+    @Test
+    public void testPartitionManagerRecreate() throws Exception {
+        final int totalTasks = 2;
+        int partitionsPerTask = 2;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092)));
+        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
+        waitForRefresh();
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093)));
+        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
+        assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
+
+        HashMap<Integer, PartitionManager> managersAfterRefresh = new HashMap<Integer, PartitionManager>();
+        for (List<PartitionManager> partitionManagersAfter : partitionManagersAfterRefresh) {
+            for (PartitionManager manager : partitionManagersAfter) {
+                assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition));
+                managersAfterRefresh.put(manager.getPartition().partition, manager);
+            }
+        }
+
+        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) {
+            for (PartitionManager manager : partitionManagersBefore) {
+                assertStateIsTheSame(manager, managersAfterRefresh.get(manager.getPartition().partition));
+            }
+        }
+    }
+
+    private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) {
+        // check if state was actually moved from old PartitionManager
+        assertNotNull(managerBefore);
+        assertNotNull(managerAfter);
+        assertNotSame(managerBefore, managerAfter);
+        assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
+        assertSame(managerBefore._emittedToOffset, managerAfter._emittedToOffset);
+        assertSame(managerBefore._committedTo, managerAfter._committedTo);
+    }
+
     private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) {
         assertEquals(partitionsPerTask, partitionManagersBefore.size());
         assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());


[3/3] storm git commit: STORM-2296: CHANGELOG

Posted by ka...@apache.org.
STORM-2296: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/553eef1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/553eef1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/553eef1c

Branch: refs/heads/1.x-branch
Commit: 553eef1c48792cdfab540598fcdba17088a170e8
Parents: c9fb7a4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 4 08:24:54 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 4 08:24:54 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/553eef1c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a29c5e0..2e9af7b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2296: Kafka spout no dup on leader changes
  * STORM-2244: Some shaded jars doesn't exclude dependency signature files
  * STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
  * STORM-2324: Fix deployment failure if resources directory is missing in topology jar


[2/3] storm git commit: Merge branch 'kafka_spout_no_dup_on_leader_changes_1_x' of https://github.com/ernisv/storm into STORM-2296-1.x-merge

Posted by ka...@apache.org.
Merge branch 'kafka_spout_no_dup_on_leader_changes_1_x' of https://github.com/ernisv/storm into STORM-2296-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9fb7a4b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9fb7a4b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9fb7a4b

Branch: refs/heads/1.x-branch
Commit: c9fb7a4b9661cc92acaf4a9fc4284b61200d39bf
Parents: d9efb5e b076399
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 4 08:00:47 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 4 08:00:47 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/PartitionManager.java    | 125 ++++++++++++-------
 .../org/apache/storm/kafka/ZkCoordinator.java   |  16 ++-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  42 ++++++-
 3 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9fb7a4b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/c9fb7a4b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------