You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/06 06:24:37 UTC
git commit: TAJO-650: Repartitioner::scheduleHashShuffledFetches
should adjust the number of tasks.
Repository: incubator-tajo
Updated Branches:
refs/heads/master ed4ba76f4 -> 462abc471
TAJO-650: Repartitioner::scheduleHashShuffledFetches should adjust the number of tasks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/462abc47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/462abc47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/462abc47
Branch: refs/heads/master
Commit: 462abc47113a5c41fed783c34cd5b35a8eae4758
Parents: ed4ba76
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Mar 6 14:16:05 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Mar 6 14:16:05 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../main/java/org/apache/tajo/util/TUtil.java | 9 +++++++
.../tajo/master/querymaster/Repartitioner.java | 28 +++++++++-----------
3 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c17c00d..d7f5882 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -269,6 +269,9 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-650: Repartitioner::scheduleHashShuffledFetches should adjust the
+ number of tasks. (hyunsik)
+
TAJO-648: TajoWorker does not send correct QM rpc and client rpc ports
via heartbeat. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 310d187..1eb55bc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -159,6 +159,15 @@ public class TUtil {
}
}
+ public static <KEY1, VALUE> void putCollectionToNestedList(Map<KEY1, List<VALUE>> map, KEY1 k1,
+ Collection<VALUE> list) {
+ if (map.containsKey(k1)) {
+ map.get(k1).addAll(list);
+ } else {
+ map.put(k1, TUtil.newList(list));
+ }
+ }
+
public static <KEY1, KEY2, VALUE> void putToNestedMap(Map<KEY1, Map<KEY2, VALUE>> map, KEY1 k1, KEY2 k2,
VALUE value) {
if (map.containsKey(k1)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/462abc47/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7d7ecad..36203bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -369,11 +369,11 @@ public class Repartitioner {
}
boolean ascendingFirstKey = sortSpecs[0].isAscending();
- SortedMap<TupleRange, Set<URI>> map;
+ SortedMap<TupleRange, Collection<URI>> map;
if (ascendingFirstKey) {
- map = new TreeMap<TupleRange, Set<URI>>();
+ map = new TreeMap<TupleRange, Collection<URI>>();
} else {
- map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
+ map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator());
}
Set<URI> uris;
@@ -398,7 +398,7 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
}
- public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Set<URI>> partitions,
+ public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions,
String tableName, int num) {
int i;
Map<String, List<URI>>[] fetchesArray = new Map[num];
@@ -406,9 +406,9 @@ public class Repartitioner {
fetchesArray[i] = new HashMap<String, List<URI>>();
}
i = 0;
- for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
- Set<URI> value = entry.getValue();
- fetchesArray[i++].put(tableName, Lists.newArrayList(value));
+ for (Entry<?, Collection<URI>> entry : partitions.entrySet()) {
+ Collection<URI> value = entry.getValue();
+ TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
if (i == num) i = 0;
}
for (Map<String, List<URI>> eachFetches : fetchesArray) {
@@ -451,7 +451,7 @@ public class Repartitioner {
SubQuery.scheduleFragments(subQuery, fragments);
Map<String, List<IntermediateEntry>> hashedByHost;
- Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
+ Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>();
for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
@@ -477,7 +477,7 @@ public class Repartitioner {
}
GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
- // the number of tasks cannot exceed the number of merged fetch uris.
+ // get a proper number of tasks
int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
LOG.info("ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
if (groupby != null && groupby.getGroupingColumns().length == 0) {
@@ -485,14 +485,10 @@ public class Repartitioner {
LOG.info("No Grouping Column - determinedTaskNum is set to 1");
}
- for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
- List<URI> value = entry.getValue();
- Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
- fetches.put(scan.getTableName(), value);
- SubQuery.scheduleFetches(subQuery, fetches);
- }
-
+ // set the proper number of tasks to the estimated task num
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+ // divide fetch uris into the the proper number of tasks in a round robin manner.
+ scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum);
LOG.info("DeterminedTaskNum : " + determinedTaskNum);
}