You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/07 03:17:48 UTC

[1/3] storm git commit: STORM-2506: Print mapping between Task ID and Kafka Partitions

Repository: storm
Updated Branches:
  refs/heads/1.x-branch c66a916c9 -> 8473376c7


STORM-2506: Print mapping between Task ID and Kafka Partitions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28284002
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28284002
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28284002

Branch: refs/heads/1.x-branch
Commit: 282840028f3285321de54077e5690805db39b5d2
Parents: fb24460
Author: Srishty Agrawal <sa...@groupon.com>
Authored: Wed Mar 29 12:35:57 2017 -0700
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Jul 5 14:48:50 2017 -0700

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  | 17 +++++++++--------
 .../apache/storm/kafka/StaticCoordinator.java   |  4 ++--
 .../org/apache/storm/kafka/ZkCoordinator.java   | 20 +++++++++++---------
 .../org/apache/storm/kafka/KafkaUtilsTest.java  |  8 ++++----
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  2 +-
 7 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 58347e3..fcef7ac 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -139,8 +139,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
+                    context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             initialize(partitions);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 01cc9b7..3abadf2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -82,11 +82,11 @@ public class KafkaSpout extends BaseRichSpout {
         if (_spoutConfig.hosts instanceof StaticHosts) {
             _coordinator = new StaticCoordinator(_connections, conf,
                     _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
+                    totalTasks, context.getThisTaskId(), topologyInstanceId);
         } else {
             _coordinator = new ZkCoordinator(_connections, conf,
                     _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
+                    totalTasks, context.getThisTaskId(), topologyInstanceId);
         }
 
         context.registerMetric("kafkaOffset", new IMetric() {

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index f23c873..73e86e9 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -254,7 +254,8 @@ public class KafkaUtils {
     }
 
 
-    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
+    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
+            int totalTasks, int taskIndex, int taskId) {
         Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
         List<Partition> taskPartitions = new ArrayList<Partition>();
         List<Partition> partitions = new ArrayList<Partition>();
@@ -269,20 +270,20 @@ public class KafkaUtils {
             Partition taskPartition = partitions.get(i);
             taskPartitions.add(taskPartition);
         }
-        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+        logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
         return taskPartitions;
     }
 
-    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
-        String taskPrefix = taskId(taskIndex, totalTasks);
+    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
+        String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
         if (taskPartitions.isEmpty()) {
-            LOG.warn(taskPrefix + "no partitions assigned");
+            LOG.warn(taskPrefix + " no partitions assigned");
         } else {
-            LOG.info(taskPrefix + "assigned " + taskPartitions);
+            LOG.info(taskPrefix + " assigned " + taskPartitions);
         }
     }
 
-    public static String taskId(int taskIndex, int totalTasks) {
-        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+    public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
+        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index 628bfc0..c3c5e97 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -26,11 +26,11 @@ public class StaticCoordinator implements PartitionCoordinator {
     Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
     List<PartitionManager> _allManagers = new ArrayList<>();
 
-    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(hosts.getPartitionInformation());
-        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
+        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
         for (Partition myPartition : myPartitions) {
             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 14be584..d9dbfb3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -23,7 +23,7 @@ import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 
 import java.util.*;
 
-import static org.apache.storm.kafka.KafkaUtils.taskId;
+import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
 
 public class ZkCoordinator implements PartitionCoordinator {
     private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
@@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator {
     SpoutConfig _spoutConfig;
     int _taskIndex;
     int _totalTasks;
+    int _taskId;
     String _topologyInstanceId;
     Map<Partition, PartitionManager> _managers = new HashMap();
     List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
@@ -41,15 +42,16 @@ public class ZkCoordinator implements PartitionCoordinator {
     ZkState _state;
     Map _stormConf;
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
-        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
+        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(stormConf, spoutConfig));
     }
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
+    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
         _spoutConfig = spoutConfig;
         _connections = connections;
         _taskIndex = taskIndex;
         _totalTasks = totalTasks;
+        _taskId = taskId;
         _topologyInstanceId = topologyInstanceId;
         _stormConf = stormConf;
         _state = state;
@@ -75,9 +77,9 @@ public class ZkCoordinator implements PartitionCoordinator {
     @Override
     public void refresh() {
         try {
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
             List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
-            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
+            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
 
             Set<Partition> curr = _managers.keySet();
             Set<Partition> newPartitions = new HashSet<Partition>(mine);
@@ -86,7 +88,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
             deletedPartitions.removeAll(mine);
 
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
 
             Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
             for (Partition id : deletedPartitions) {
@@ -95,7 +97,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             for (PartitionManager manager : deletedManagers.values()) {
                 if (manager != null) manager.close();
             }
-            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(
@@ -113,7 +115,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             throw new RuntimeException(e);
         }
         _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
+        LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
index 9da6c0a..9362f91 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
@@ -271,7 +271,7 @@ public class KafkaUtilsTest {
         partitions.add(globalPartitionInformation);
         int numTasks = numPartitions / partitionsPerTask;
         for (int i = 0 ; i < numTasks ; i++) {
-            assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
+            assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size());
         }
     }
 
@@ -281,8 +281,8 @@ public class KafkaUtilsTest {
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(globalPartitionInformation);
         int numTasks = 2;
-        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
-        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
+        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size());
+        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size());
     }
 
     @Test (expected = IllegalArgumentException.class )
@@ -290,6 +290,6 @@ public class KafkaUtilsTest {
         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(globalPartitionInformation);
-        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
+        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index b23d5bc..0b86845 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -174,7 +174,7 @@ public class ZkCoordinatorTest {
     private List<ZkCoordinator> buildCoordinators(int totalTasks) {
         List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
         for (int i = 0; i < totalTasks; i++) {
-            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
+            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, i, "test-id", reader);
             coordinatorList.add(coordinator);
         }
         return coordinatorList;


[3/3] storm git commit: STORM-2506: CHANGELOG

Posted by ka...@apache.org.
STORM-2506: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8473376c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8473376c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8473376c

Branch: refs/heads/1.x-branch
Commit: 8473376c7c63385bfd62e56e963b0da76b15afd0
Parents: c90d140
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 12:17:34 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 12:17:34 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8473376c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 98a2320..db17339 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.2.0
+ * STORM-2506: Print mapping between Task ID and Kafka Partitions
  * STORM-2601: add the timeout parameter to the method of getting the nimbus client
  * STORM-2369: [storm-redis] Use binary type for State management
  * STORM-2599: Fix BasicContainer wildcard classpath on Windows


[2/3] storm git commit: Merge branch 'STORM-2506--1.x-branch' of https://github.com/erikdw/storm into STORM-2506-1.x

Posted by ka...@apache.org.
Merge branch 'STORM-2506--1.x-branch' of https://github.com/erikdw/storm into STORM-2506-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c90d140b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c90d140b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c90d140b

Branch: refs/heads/1.x-branch
Commit: c90d140bed6f2b578e0a6cb130ab3546148ccb42
Parents: c66a916 2828400
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 12:15:46 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 12:15:46 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  | 17 +++++++++--------
 .../apache/storm/kafka/StaticCoordinator.java   |  4 ++--
 .../org/apache/storm/kafka/ZkCoordinator.java   | 20 +++++++++++---------
 .../org/apache/storm/kafka/KafkaUtilsTest.java  |  8 ++++----
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  2 +-
 7 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------