You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/06 06:24:37 UTC

git commit: TAJO-650: Repartitioner::scheduleHashShuffledFetches should adjust the number of tasks.

Repository: incubator-tajo
Updated Branches:
  refs/heads/master ed4ba76f4 -> 462abc471


TAJO-650: Repartitioner::scheduleHashShuffledFetches should adjust the number of tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/462abc47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/462abc47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/462abc47

Branch: refs/heads/master
Commit: 462abc47113a5c41fed783c34cd5b35a8eae4758
Parents: ed4ba76
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Mar 6 14:16:05 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Mar 6 14:16:05 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../main/java/org/apache/tajo/util/TUtil.java   |  9 +++++++
 .../tajo/master/querymaster/Repartitioner.java  | 28 +++++++++-----------
 3 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c17c00d..d7f5882 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -269,6 +269,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-650: Repartitioner::scheduleHashShuffledFetches should adjust the
+    number of tasks. (hyunsik)
+
     TAJO-648: TajoWorker does not send correct QM rpc and client rpc ports
     via heartbeat. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 310d187..1eb55bc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -159,6 +159,15 @@ public class TUtil {
     }
   }
 
+  public static <KEY1, VALUE> void putCollectionToNestedList(Map<KEY1, List<VALUE>> map, KEY1 k1,
+                                                             Collection<VALUE> list) {
+    if (map.containsKey(k1)) {
+      map.get(k1).addAll(list);
+    } else {
+      map.put(k1, TUtil.newList(list));
+    }
+  }
+
   public static <KEY1, KEY2, VALUE> void putToNestedMap(Map<KEY1, Map<KEY2, VALUE>> map, KEY1 k1, KEY2 k2,
                                                         VALUE value) {
     if (map.containsKey(k1)) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7d7ecad..36203bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -369,11 +369,11 @@ public class Repartitioner {
     }
 
     boolean ascendingFirstKey = sortSpecs[0].isAscending();
-    SortedMap<TupleRange, Set<URI>> map;
+    SortedMap<TupleRange, Collection<URI>> map;
     if (ascendingFirstKey) {
-      map = new TreeMap<TupleRange, Set<URI>>();
+      map = new TreeMap<TupleRange, Collection<URI>>();
     } else {
-      map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
+      map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator());
     }
 
     Set<URI> uris;
@@ -398,7 +398,7 @@ public class Repartitioner {
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Set<URI>> partitions,
+  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions,
                                                    String tableName, int num) {
     int i;
     Map<String, List<URI>>[] fetchesArray = new Map[num];
@@ -406,9 +406,9 @@ public class Repartitioner {
       fetchesArray[i] = new HashMap<String, List<URI>>();
     }
     i = 0;
-    for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
-      Set<URI> value = entry.getValue();
-      fetchesArray[i++].put(tableName, Lists.newArrayList(value));
+    for (Entry<?, Collection<URI>> entry : partitions.entrySet()) {
+      Collection<URI> value = entry.getValue();
+      TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
       if (i == num) i = 0;
     }
     for (Map<String, List<URI>> eachFetches : fetchesArray) {
@@ -451,7 +451,7 @@ public class Repartitioner {
     SubQuery.scheduleFragments(subQuery, fragments);
 
     Map<String, List<IntermediateEntry>> hashedByHost;
-    Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
+    Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>();
 
     for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
@@ -477,7 +477,7 @@ public class Repartitioner {
     }
 
     GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
-    // the number of tasks cannot exceed the number of merged fetch uris.
+    // get a proper number of tasks
     int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
     LOG.info("ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
     if (groupby != null && groupby.getGroupingColumns().length == 0) {
@@ -485,14 +485,10 @@ public class Repartitioner {
       LOG.info("No Grouping Column - determinedTaskNum is set to 1");
     }
 
-    for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
-      List<URI> value = entry.getValue();
-      Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
-      fetches.put(scan.getTableName(), value);
-      SubQuery.scheduleFetches(subQuery, fetches);
-    }
-
+    // set the proper number of tasks to the estimated task num
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+    // divide fetch uris into the the proper number of tasks in a round robin manner.
+    scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum);
     LOG.info("DeterminedTaskNum : " + determinedTaskNum);
   }