You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:09 UTC
[02/17] storm git commit: Partitions were not getting assigned
equally among tasks
Partitions were not getting assigned equally among tasks
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a989a4c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a989a4c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a989a4c3
Branch: refs/heads/master
Commit: a989a4c30fad07becfac30760f4b858e7b31cd3a
Parents: 455435a
Author: Sumit Chawla <su...@gmail.com>
Authored: Tue May 26 12:35:58 2015 -0700
Committer: Sumit Chawla <su...@gmail.com>
Committed: Tue May 26 12:35:58 2015 -0700
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaUtils.java | 21 ++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a989a4c3/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 38f4855..147503a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -232,18 +232,19 @@ public class KafkaUtils {
public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
List<Partition> taskPartitions = new ArrayList<Partition>();
+ List<Partition> partitions = new ArrayList<Partition>();
for(GlobalPartitionInformation partitionInformation : partitons) {
- List<Partition> partitions = partitionInformation.getOrderedPartitions();
- int numPartitions = partitions.size();
- if (numPartitions < totalTasks) {
- LOG.warn(partitionInformation.topic + "there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
- }
- for (int i = taskIndex; i < numPartitions; i += totalTasks) {
- Partition taskPartition = partitions.get(i);
- taskPartitions.add(taskPartition);
- }
- logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+ partitions.addAll(partitionInformation.getOrderedPartitions());
+ }
+ int numPartitions = partitions.size();
+ if (numPartitions < totalTasks) {
+ LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+ }
+ for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+ Partition taskPartition = partitions.get(i);
+ taskPartitions.add(taskPartition);
}
+ logPartitionMapping(totalTasks, taskIndex, taskPartitions);
return taskPartitions;
}