You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/07 03:17:48 UTC
[1/3] storm git commit: STORM-2506: Print mapping between Task ID and
Kafka Partitions
Repository: storm
Updated Branches:
refs/heads/1.x-branch c66a916c9 -> 8473376c7
STORM-2506: Print mapping between Task ID and Kafka Partitions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28284002
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28284002
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28284002
Branch: refs/heads/1.x-branch
Commit: 282840028f3285321de54077e5690805db39b5d2
Parents: fb24460
Author: Srishty Agrawal <sa...@groupon.com>
Authored: Wed Mar 29 12:35:57 2017 -0700
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Jul 5 14:48:50 2017 -0700
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 4 ++--
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 4 ++--
.../jvm/org/apache/storm/kafka/KafkaUtils.java | 17 +++++++++--------
.../apache/storm/kafka/StaticCoordinator.java | 4 ++--
.../org/apache/storm/kafka/ZkCoordinator.java | 20 +++++++++++---------
.../org/apache/storm/kafka/KafkaUtilsTest.java | 8 ++++----
.../apache/storm/kafka/ZkCoordinatorTest.java | 2 +-
7 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 58347e3..fcef7ac 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -139,8 +139,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
+ context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
initialize(partitions);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 01cc9b7..3abadf2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -82,11 +82,11 @@ public class KafkaSpout extends BaseRichSpout {
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
- totalTasks, topologyInstanceId);
+ totalTasks, context.getThisTaskId(), topologyInstanceId);
} else {
_coordinator = new ZkCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
- totalTasks, topologyInstanceId);
+ totalTasks, context.getThisTaskId(), topologyInstanceId);
}
context.registerMetric("kafkaOffset", new IMetric() {
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index f23c873..73e86e9 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -254,7 +254,8 @@ public class KafkaUtils {
}
- public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
+ public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
+ int totalTasks, int taskIndex, int taskId) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
List<Partition> taskPartitions = new ArrayList<Partition>();
List<Partition> partitions = new ArrayList<Partition>();
@@ -269,20 +270,20 @@ public class KafkaUtils {
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
- logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+ logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
return taskPartitions;
}
- private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
- String taskPrefix = taskId(taskIndex, totalTasks);
+ private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
+ String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
if (taskPartitions.isEmpty()) {
- LOG.warn(taskPrefix + "no partitions assigned");
+ LOG.warn(taskPrefix + " no partitions assigned");
} else {
- LOG.info(taskPrefix + "assigned " + taskPartitions);
+ LOG.info(taskPrefix + " assigned " + taskPartitions);
}
}
- public static String taskId(int taskIndex, int totalTasks) {
- return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+ public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
+ return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index 628bfc0..c3c5e97 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -26,11 +26,11 @@ public class StaticCoordinator implements PartitionCoordinator {
Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
List<PartitionManager> _allManagers = new ArrayList<>();
- public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+ public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(hosts.getPartitionInformation());
- List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
+ List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
for (Partition myPartition : myPartitions) {
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 14be584..d9dbfb3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -23,7 +23,7 @@ import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
-import static org.apache.storm.kafka.KafkaUtils.taskId;
+import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
public class ZkCoordinator implements PartitionCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
@@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator {
SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
+ int _taskId;
String _topologyInstanceId;
Map<Partition, PartitionManager> _managers = new HashMap();
List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
@@ -41,15 +42,16 @@ public class ZkCoordinator implements PartitionCoordinator {
ZkState _state;
Map _stormConf;
- public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
- this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
+ this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(stormConf, spoutConfig));
}
- public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
+ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
_totalTasks = totalTasks;
+ _taskId = taskId;
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
_state = state;
@@ -75,9 +77,9 @@ public class ZkCoordinator implements PartitionCoordinator {
@Override
public void refresh() {
try {
- LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
+ LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
- List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
+ List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
Set<Partition> curr = _managers.keySet();
Set<Partition> newPartitions = new HashSet<Partition>(mine);
@@ -86,7 +88,7 @@ public class ZkCoordinator implements PartitionCoordinator {
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);
- LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
+ LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
@@ -95,7 +97,7 @@ public class ZkCoordinator implements PartitionCoordinator {
for (PartitionManager manager : deletedManagers.values()) {
if (manager != null) manager.close();
}
- LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
+ LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(
@@ -113,7 +115,7 @@ public class ZkCoordinator implements PartitionCoordinator {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
- LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
+ LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
index 9da6c0a..9362f91 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
@@ -271,7 +271,7 @@ public class KafkaUtilsTest {
partitions.add(globalPartitionInformation);
int numTasks = numPartitions / partitionsPerTask;
for (int i = 0 ; i < numTasks ; i++) {
- assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
+ assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size());
}
}
@@ -281,8 +281,8 @@ public class KafkaUtilsTest {
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
int numTasks = 2;
- assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
- assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
+ assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size());
+ assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size());
}
@Test (expected = IllegalArgumentException.class )
@@ -290,6 +290,6 @@ public class KafkaUtilsTest {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
- KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
+ KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28284002/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index b23d5bc..0b86845 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -174,7 +174,7 @@ public class ZkCoordinatorTest {
private List<ZkCoordinator> buildCoordinators(int totalTasks) {
List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
for (int i = 0; i < totalTasks; i++) {
- ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
+ ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, i, "test-id", reader);
coordinatorList.add(coordinator);
}
return coordinatorList;
[3/3] storm git commit: STORM-2506: CHANGELOG
Posted by ka...@apache.org.
STORM-2506: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8473376c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8473376c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8473376c
Branch: refs/heads/1.x-branch
Commit: 8473376c7c63385bfd62e56e963b0da76b15afd0
Parents: c90d140
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 12:17:34 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 12:17:34 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8473376c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 98a2320..db17339 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.2.0
+ * STORM-2506: Print mapping between Task ID and Kafka Partitions
* STORM-2601: add the timeout parameter to the method of getting the nimbus client
* STORM-2369: [storm-redis] Use binary type for State management
* STORM-2599: Fix BasicContainer wildcard classpath on Windows
[2/3] storm git commit: Merge branch 'STORM-2506--1.x-branch' of
https://github.com/erikdw/storm into STORM-2506-1.x
Posted by ka...@apache.org.
Merge branch 'STORM-2506--1.x-branch' of https://github.com/erikdw/storm into STORM-2506-1.x
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c90d140b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c90d140b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c90d140b
Branch: refs/heads/1.x-branch
Commit: c90d140bed6f2b578e0a6cb130ab3546148ccb42
Parents: c66a916 2828400
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 12:15:46 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 12:15:46 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 4 ++--
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 4 ++--
.../jvm/org/apache/storm/kafka/KafkaUtils.java | 17 +++++++++--------
.../apache/storm/kafka/StaticCoordinator.java | 4 ++--
.../org/apache/storm/kafka/ZkCoordinator.java | 20 +++++++++++---------
.../org/apache/storm/kafka/KafkaUtilsTest.java | 8 ++++----
.../apache/storm/kafka/ZkCoordinatorTest.java | 2 +-
7 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------