You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:09 UTC

[02/17] storm git commit: Partitions were not getting assigned equally among tasks

Partitions were not getting assigned equally among tasks


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a989a4c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a989a4c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a989a4c3

Branch: refs/heads/master
Commit: a989a4c30fad07becfac30760f4b858e7b31cd3a
Parents: 455435a
Author: Sumit Chawla <su...@gmail.com>
Authored: Tue May 26 12:35:58 2015 -0700
Committer: Sumit Chawla <su...@gmail.com>
Committed: Tue May 26 12:35:58 2015 -0700

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         | 21 ++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a989a4c3/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 38f4855..147503a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -232,18 +232,19 @@ public class KafkaUtils {
     public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
         Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
         List<Partition> taskPartitions = new ArrayList<Partition>();
+        List<Partition> partitions = new ArrayList<Partition>();
         for(GlobalPartitionInformation partitionInformation : partitons) {
-            List<Partition> partitions = partitionInformation.getOrderedPartitions();
-            int numPartitions = partitions.size();
-            if (numPartitions < totalTasks) {
-                LOG.warn(partitionInformation.topic + "there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
-            }
-            for (int i = taskIndex; i < numPartitions; i += totalTasks) {
-                Partition taskPartition = partitions.get(i);
-                taskPartitions.add(taskPartition);
-            }
-            logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+            partitions.addAll(partitionInformation.getOrderedPartitions());
+        }
+        int numPartitions = partitions.size();
+        if (numPartitions < totalTasks) {
+            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+        }
+        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+            Partition taskPartition = partitions.get(i);
+            taskPartitions.add(taskPartition);
         }
+        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
         return taskPartitions;
     }