You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/05/09 09:36:37 UTC
git commit: TAJO-58: Remove obsolete methods in GlobalPlanner.
(hyunsik)
Updated Branches:
refs/heads/master 0652815f5 -> 2923f4e3b
TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2923f4e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2923f4e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2923f4e3
Branch: refs/heads/master
Commit: 2923f4e3b5812f25e73e26c2af0add4262c6a07e
Parents: 0652815
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 9 16:34:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 9 16:35:31 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../src/main/java/tajo/master/GlobalPlanner.java | 538 +--------------
.../engine/plan/global/TestGlobalQueryPlanner.java | 143 ----
3 files changed, 8 insertions(+), 677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cba4f8f..7dd10de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.2.0 - unreleased
NEW FEATURES
+ TAJO-57: Recognize Parser and Catalog Standard SQL data types. (hyunsik)
+
TAJO-33: Implement a basic query progress indicator. (hyunsik)
IMPROVEMENTS
@@ -37,6 +39,8 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik)
+
TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)
TAJO-47: RowFile has the duplicated initialization problem and unflipped
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index 376f15d..2176b8e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -18,39 +18,28 @@
package tajo.master;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import tajo.QueryId;
import tajo.QueryIdFactory;
-import tajo.QueryUnitAttemptId;
import tajo.SubQueryId;
import tajo.catalog.*;
import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.common.exception.NotImplementedException;
import tajo.conf.TajoConf;
import tajo.engine.parser.QueryBlock.FromTable;
import tajo.engine.planner.PlannerUtil;
import tajo.engine.planner.global.MasterPlan;
import tajo.engine.planner.logical.*;
-import tajo.engine.utils.TupleUtil;
import tajo.master.ExecutionBlock.PartitionType;
-import tajo.storage.Fragment;
import tajo.storage.StorageManager;
-import tajo.storage.TupleRange;
import tajo.util.TajoIdUtils;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
@@ -690,75 +679,6 @@ public class GlobalPlanner {
prevOutputType);
}
}
-
- @VisibleForTesting
- public ExecutionBlock createMultilevelGroupby(
- ExecutionBlock firstPhaseGroupby, Column[] keys)
- throws CloneNotSupportedException, IOException {
- ExecutionBlock secondPhaseGroupby = firstPhaseGroupby.getParentBlock();
- Preconditions.checkState(secondPhaseGroupby.getScanNodes().length == 1);
-
- ScanNode secondScan = secondPhaseGroupby.getScanNodes()[0];
- GroupbyNode secondGroupby = (GroupbyNode) secondPhaseGroupby.
- getStoreTableNode().getSubNode();
- ExecutionBlock newPhaseGroupby = new ExecutionBlock(
- QueryIdFactory.newSubQueryId(firstPhaseGroupby.getId().getQueryId()));
- LogicalNode tmp = PlannerUtil.findTopParentNode(
- firstPhaseGroupby.getPlan(), ExprType.GROUP_BY);
- GroupbyNode firstGroupby;
- if (tmp instanceof UnaryNode) {
- firstGroupby = (GroupbyNode) ((UnaryNode)tmp).getSubNode();
- GroupbyNode newFirstGroupby = GlobalPlannerUtils.newGroupbyPlan(
- firstGroupby.getInSchema(),
- firstGroupby.getOutSchema(),
- keys,
- firstGroupby.getHavingCondition(),
- firstGroupby.getTargets()
- );
- newFirstGroupby.setSubNode(firstGroupby.getSubNode());
- ((UnaryNode) tmp).setSubNode(newFirstGroupby);
- }
-
- // create a new SubQuery containing the group by plan
- StoreTableNode newStore = GlobalPlannerUtils.newStorePlan(
- secondScan.getInSchema(),
- newPhaseGroupby.getId().toString());
- newStore.setLocal(true);
- ScanNode newScan = GlobalPlannerUtils.newScanPlan(
- firstPhaseGroupby.getOutputSchema(),
- firstPhaseGroupby.getOutputName(),
- sm.getTablePath(firstPhaseGroupby.getOutputName()));
- newScan.setLocal(true);
- GroupbyNode newGroupby = GlobalPlannerUtils.newGroupbyPlan(
- newScan.getOutSchema(),
- newStore.getInSchema(),
- keys,
- secondGroupby.getHavingCondition(),
- secondGroupby.getTargets());
- newGroupby.setSubNode(newScan);
- newStore.setSubNode(newGroupby);
- newPhaseGroupby.setPlan(newStore);
-
- secondPhaseGroupby.removeChildBlock(secondScan);
-
- // update the scan node of last phase
- secondScan = GlobalPlannerUtils.newScanPlan(secondScan.getInSchema(),
- newPhaseGroupby.getOutputName(),
- sm.getTablePath(newPhaseGroupby.getOutputName()));
- secondScan.setLocal(true);
- secondGroupby.setSubNode(secondScan);
- secondPhaseGroupby.setPlan(secondPhaseGroupby.getPlan());
-
- // insert the new SubQuery
- // between the first phase and the second phase
- secondPhaseGroupby.addChildBlock(secondScan, newPhaseGroupby);
- newPhaseGroupby.addChildBlock(newPhaseGroupby.getScanNodes()[0],
- firstPhaseGroupby);
- newPhaseGroupby.setParentBlock(secondPhaseGroupby);
- firstPhaseGroupby.setParentBlock(newPhaseGroupby);
-
- return newPhaseGroupby;
- }
private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
TableMeta meta) throws IOException {
@@ -798,454 +718,4 @@ public class GlobalPlanner {
}
return new MasterPlan(root);
}
-
- /**
- * 2개의 scan을 가진 QueryUnit들에 fragment와 fetch를 할당
- *
- * @param subQuery
- * @param n
- * @param fragMap
- * @param fetchMap
- * @return
- */
- private List<QueryUnit> makeBinaryQueryUnit(SubQuery subQuery, final int n,
- Map<ScanNode, List<Fragment>> fragMap,
- Map<ScanNode, List<URI>> fetchMap) {
- ExecutionBlock execBlock = subQuery.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
- final int maxQueryUnitNum = n;
-
- if (execBlock.hasChildBlock()) {
- ExecutionBlock prev = execBlock.getChildBlock(scans[0]);
- switch (prev.getPartitionType()) {
- case BROADCAST:
- throw new NotImplementedException();
- case HASH:
- if (scans[0].isLocal()) {
- queryUnits = assignFetchesToBinaryByHash(subQuery, queryUnits,
- fetchMap, maxQueryUnitNum);
- queryUnits = assignEqualFragment(queryUnits, fragMap);
- } else {
- throw new NotImplementedException();
- }
- break;
- case LIST:
- throw new NotImplementedException();
- }
- } else {
- queryUnits = makeQueryUnitsForBinaryPlan(subQuery,
- queryUnits, fragMap);
- }
-
- return queryUnits;
- }
-
- public List<QueryUnit> makeQueryUnitsForBinaryPlan(
- SubQuery subQuery, List<QueryUnit> queryUnits,
- Map<ScanNode, List<Fragment>> fragmentMap) {
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryUnit queryUnit;
- if (execBlock.hasJoin()) {
- // make query units for every composition of fragments of each scan
- Preconditions.checkArgument(fragmentMap.size()==2);
-
- ScanNode [] scanNodes = execBlock.getScanNodes();
- String innerId = null, outerId = null;
-
- // If one relation is set to broadcast, it meaning that the relation
- // is less than one block size. That is, the relation has only
- // one fragment. If this assumption is kept, the below code is always
- // correct.
- if (scanNodes[0].isBroadcast() || scanNodes[1].isBroadcast()) {
- List<Fragment> broadcastFrag = null;
- List<Fragment> baseFrag = null;
- if (scanNodes[0].isBroadcast()) {
- broadcastFrag = fragmentMap.get(scanNodes[0]);
- baseFrag = fragmentMap.get(scanNodes[1]);
-
- innerId = scanNodes[0].getTableId();
- outerId = scanNodes[1].getTableId();
- } else if (scanNodes[1].isBroadcast()) {
- broadcastFrag = fragmentMap.get(scanNodes[1]);
- baseFrag = fragmentMap.get(scanNodes[0]);
-
- innerId = scanNodes[1].getTableId();
- outerId = scanNodes[0].getTableId();
- }
-
- for (Fragment outer : baseFrag) {
- queryUnit = newQueryUnit(subQuery);
- queryUnit.setFragment(outerId, outer);
- for (Fragment inner : broadcastFrag) {
- queryUnit.setFragment(innerId, inner);
- }
- queryUnits.add(queryUnit);
- }
- } else {
- List<Fragment> innerFrags, outerFrags;
- Iterator<Entry<ScanNode, List<Fragment>>> it =
- fragmentMap.entrySet().iterator();
- Entry<ScanNode, List<Fragment>> e = it.next();
- innerId = e.getKey().getTableId();
- innerFrags = e.getValue();
- e = it.next();
- outerId = e.getKey().getTableId();
- outerFrags = e.getValue();
- for (Fragment outer : outerFrags) {
- for (Fragment inner : innerFrags) {
- queryUnit = newQueryUnit(subQuery);
- queryUnit.setFragment(innerId, inner);
- queryUnit.setFragment(outerId, outer);
- queryUnits.add(queryUnit);
- }
- }
- }
- }
-
- return queryUnits;
- }
-
- private List<QueryUnit> makeQueryUnitsForEachFragment(
- SubQuery subQuery, List<QueryUnit> queryUnits,
- ScanNode scan, List<Fragment> fragments) {
- QueryUnit queryUnit;
- for (Fragment fragment : fragments) {
- queryUnit = newQueryUnit(subQuery);
- queryUnit.setFragment(scan.getTableId(), fragment);
- queryUnits.add(queryUnit);
- }
- return queryUnits;
- }
-
- private QueryUnit newQueryUnit(SubQuery subQuery) {
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(),
- subQuery.getEventHandler());
- unit.setLogicalPlan(execBlock.getPlan());
- return unit;
- }
-
- /**
- * Binary QueryUnit들에 hash 파티션된 fetch를 할당
- *
- * @param subQuery
- * @param unitList
- * @param fetchMap
- * @param n
- * @return
- */
- private List<QueryUnit> assignFetchesToBinaryByHash(SubQuery subQuery,
- List<QueryUnit> unitList, Map<ScanNode, List<URI>> fetchMap, final int n) {
- QueryUnit unit;
- int i = 0;
- Map<String, Map<ScanNode, List<URI>>> hashed = hashFetches(fetchMap);
- Iterator<Entry<String, Map<ScanNode, List<URI>>>> it =
- hashed.entrySet().iterator();
- Entry<String, Map<ScanNode, List<URI>>> e;
- while (it.hasNext()) {
- e = it.next();
- if (e.getValue().size() == 2) { // only if two matched partitions
- if (unitList.size() == n) {
- unit = unitList.get(i++);
- if (i == unitList.size()) {
- i = 0;
- }
- } else {
- unit = newQueryUnit(subQuery);
- unitList.add(unit);
- }
- Map<ScanNode, List<URI>> m = e.getValue();
- for (ScanNode scan : m.keySet()) {
- for (URI uri : m.get(scan)) {
- unit.addFetch(scan.getTableId(), uri);
- }
- }
- }
- }
-
- return unitList;
- }
-
- /**
- * Unary QueryUnit들에 hash 파티션된 fetch를 할당
- *
- * @param subQuery
- * @param unitList
- * @param scan
- * @param uriList
- * @param n
- * @return
- */
- private List<QueryUnit> assignFetchesToUnaryByHash(SubQuery subQuery,
- List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) {
- Map<String, List<URI>> hashed = hashFetches(subQuery.getId(), uriList); // hash key, uri
- QueryUnit unit;
- int i = 0;
- // TODO: units에 hashed 할당
- Iterator<Entry<String, List<URI>>> it = hashed.entrySet().iterator();
- Entry<String, List<URI>> e;
- while (it.hasNext()) {
- e = it.next();
- if (unitList.size() == n) {
- unit = unitList.get(i++);
- if (i == unitList.size()) {
- i = 0;
- }
- } else {
- unit = newQueryUnit(subQuery);
- unitList.add(unit);
- }
- unit.addFetches(scan.getTableId(), e.getValue());
- }
- return unitList;
- }
-
- private List<QueryUnit> assignFetchesByRange(SubQuery subQuery,
- List<QueryUnit> unitList,
- ScanNode scan,
- List<URI> uriList,
- int n,
- Schema rangeSchema,
- boolean ascendingFirstKey)
- throws UnsupportedEncodingException {
- Map<TupleRange, Set<URI>> partitions =
- rangeFetches(rangeSchema, uriList, ascendingFirstKey);
- // grouping urls by range
- QueryUnit unit;
- int i = 0;
- Iterator<Entry<TupleRange, Set<URI>>> it = partitions.entrySet().iterator();
- Entry<TupleRange, Set<URI>> e;
- while (it.hasNext()) {
- e = it.next();
- if (unitList.size() == n) {
- unit = unitList.get(i++);
- if (i == unitList.size()) {
- i = 0;
- }
- } else {
- unit = newQueryUnit(subQuery);
- unitList.add(unit);
- }
- unit.addFetches(scan.getTableId(), e.getValue());
- }
- return unitList;
- }
-
- /**
- * Unary QueryUnit들에 list 파티션된 fetch를 할당
- *
- * @param subQuery
- * @param unitList
- * @param scan
- * @param uriList
- * @param n
- * @return
- */
- private List<QueryUnit> assignFetchesByRoundRobin(SubQuery subQuery,
- List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) {
- QueryUnit unit;
- int i = 0;
- for (URI uri : uriList) {
- if (unitList.size() < n) {
- unit = newQueryUnit(subQuery);
- unitList.add(unit);
- } else {
- unit = unitList.get(i++);
- if (i == unitList.size()) {
- i = 0;
- }
- }
- unit.addFetch(scan.getTableId(), uri);
- }
- return unitList;
- }
-
- @VisibleForTesting
- public static Map<String, Map<ScanNode, List<URI>>> hashFetches(Map<ScanNode, List<URI>> uriMap) {
- SortedMap<String, Map<ScanNode, List<URI>>> hashed =
- new TreeMap<String, Map<ScanNode, List<URI>>>();
- String uriPath, key;
- Map<ScanNode, List<URI>> m;
- List<URI> uriList;
- for (Entry<ScanNode, List<URI>> e : uriMap.entrySet()) {
- for (URI uri : e.getValue()) {
- uriPath = uri.toString();
- key = uriPath.substring(uriPath.lastIndexOf("=")+1);
- if (hashed.containsKey(key)) {
- m = hashed.get(key);
- } else {
- m = new HashMap<ScanNode, List<URI>>();
- }
- if (m.containsKey(e.getKey())) {
- uriList = m.get(e.getKey());
- } else {
- uriList = new ArrayList<URI>();
- }
- uriList.add(uri);
- m.put(e.getKey(), uriList);
- hashed.put(key, m);
- }
- }
-
- SortedMap<String, Map<ScanNode, List<URI>>> finalHashed = new TreeMap<String, Map<ScanNode,List<URI>>>();
- for (Entry<String, Map<ScanNode, List<URI>>> entry : hashed.entrySet()) {
- finalHashed.put(entry.getKey(), combineURIByHostForBinary(entry.getValue()));
- }
-
- return finalHashed;
- }
-
- private static Map<ScanNode, List<URI>> combineURIByHostForBinary(Map<ScanNode, List<URI>> hashed) {
- Map<ScanNode, List<URI>> finalHashed = Maps.newHashMap();
- for (Entry<ScanNode, List<URI>> urisByKey : hashed.entrySet()) {
- Map<String, List<String>> param = new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters();
- QueryUnitAttemptId quid = new QueryUnitAttemptId(
- new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0));
- SubQueryId sid = quid.getSubQueryId();
- String fn = param.get("fn").get(0);
- Map<String, List<String>> quidByHost = Maps.newHashMap();
- for(URI uri : urisByKey.getValue()) {
- final Map<String,List<String>> params =
- new QueryStringDecoder(uri).getParameters();
- if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) {
- quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0));
- } else {
- quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid")));
- }
- }
-
- finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), fn));
- }
-
- return finalHashed;
- }
-
-
- @VisibleForTesting
- public static Map<String, List<URI>> hashFetches(SubQueryId sid, List<URI> uriList) {
- SortedMap<String, List<URI>> hashed = new TreeMap<String, List<URI>>();
- String uriPath, key;
- for (URI uri : uriList) {
- // TODO
- uriPath = uri.toString();
- key = uriPath.substring(uriPath.lastIndexOf("=")+1);
- if (hashed.containsKey(key)) {
- hashed.get(key).add(uri);
- } else {
- List<URI> list = new ArrayList<URI>();
- list.add(uri);
- hashed.put(key, list);
- }
- }
-
- return combineURIByHost(hashed);
- }
-
- private static Map<String, List<URI>> combineURIByHost(Map<String, List<URI>> hashed) {
- Map<String, List<URI>> finalHashed = Maps.newTreeMap();
- for (Entry<String, List<URI>> urisByKey : hashed.entrySet()) {
- QueryUnitAttemptId quid = new QueryUnitAttemptId(
- new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0));
- SubQueryId sid = quid.getSubQueryId();
- Map<String,List<String>> quidByHost = Maps.newHashMap();
- for(URI uri : urisByKey.getValue()) {
- final Map<String,List<String>> params =
- new QueryStringDecoder(uri).getParameters();
- if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) {
- quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0));
- } else {
- quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid")));
- }
- }
- finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), urisByKey.getKey()));
- }
- return finalHashed;
- }
-
- private static List<URI> mergeURI(Map<String, List<String>> quidByKey, String sid, String fn) {
- List<URI> uris = Lists.newArrayList();
- for (Entry<String, List<String>> quidByHost : quidByKey.entrySet()) {
- StringBuilder sb = new StringBuilder("http://")
- .append(quidByHost.getKey()).append("/").append("?fn="+fn).append("&sid="+sid);
- sb.append("&qid=");
- boolean first = true;
- for (String qid : quidByHost.getValue()) {
- if (first) {
- first = false;
- } else {
- sb.append(",");
- }
-
- QueryUnitAttemptId quid = new QueryUnitAttemptId(qid);
- sb.append(quid.getQueryUnitId().getId() + "_");
- sb.append(quid.getId());
- }
- uris.add(URI.create(sb.toString()));
- }
- return uris;
- }
-
- private Map<TupleRange, Set<URI>> rangeFetches(final Schema schema,
- final List<URI> uriList,
- final boolean ascendingFirstKey)
- throws UnsupportedEncodingException {
- SortedMap<TupleRange, Set<URI>> map;
- if (ascendingFirstKey) {
- map = new TreeMap<TupleRange, Set<URI>>();
- } else {
- map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
- }
- TupleRange range;
- Set<URI> uris;
- for (URI uri : uriList) {
- // URI.getQuery() returns a url-decoded query string.
- range = TupleUtil.queryToRange(schema, uri.getQuery());
- if (map.containsKey(range)) {
- uris = map.get(range);
- uris.add(uri);
- } else {
- uris = Sets.newHashSet();
- uris.add(uri);
- map.put(range, uris);
- }
- }
-
- return map;
- }
-
- /**
- * Unary QueryUnit들에 대하여 동일한 fragment를 할당
- *
- * @param unitList
- * @param scan
- * @param frag
- * @return
- */
- private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList,
- ScanNode scan, Fragment frag) {
- for (int i = 0; i < unitList.size(); i++) {
- unitList.get(i).setFragment(scan.getTableId(), frag);
- }
-
- return unitList;
- }
-
- /**
- * Binary QueryUnit들에 대하여 scan별로 동일한 fragment를 할당
- * @param unitList
- * @param fragMap
- * @return
- */
- private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList,
- Map<ScanNode, List<Fragment>> fragMap) {
- for (int i = 0; i < unitList.size(); i++) {
- for (ScanNode scan : fragMap.keySet()) {
- unitList.get(i).setFragment(scan.getTableId(),
- fragMap.get(scan).get(0));
- }
- }
- return unitList;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index ebd9d07..efe0562 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -18,13 +18,10 @@
package tajo.engine.plan.global;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,7 +37,6 @@ import tajo.datum.Datum;
import tajo.datum.DatumFactory;
import tajo.engine.eval.TestEvalTree.TestSum;
import tajo.engine.parser.QueryAnalyzer;
-import tajo.engine.parser.QueryBlock;
import tajo.engine.planner.LogicalOptimizer;
import tajo.engine.planner.LogicalPlanner;
import tajo.engine.planner.PlanningContext;
@@ -53,10 +49,7 @@ import tajo.master.TajoMaster;
import tajo.storage.*;
import java.io.IOException;
-import java.net.URI;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import static org.junit.Assert.*;
@@ -389,140 +382,4 @@ public class TestGlobalQueryPlanner {
assertEquals(ExprType.SCAN, groupby.getSubNode().getType());
}
}
-
- @Test
- public void testHashFetches() {
- URI[] uris = {
- URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"),
- URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1")
- };
-
- Map<String, List<URI>> hashed = GlobalPlanner.hashFetches(null, Lists.newArrayList(uris));
- assertEquals(2, hashed.size());
- List<URI> res = hashed.get("0");
- assertEquals(2, res.size());
- res = hashed.get("1");
- assertEquals(1, res.size());
- QueryStringDecoder decoder = new QueryStringDecoder(res.get(0));
- Map<String, List<String>> params = decoder.getParameters();
- String [] qids = params.get("qid").get(0).split(",");
- assertEquals(2, qids.length);
- assertEquals("104_0", qids[0]);
- assertEquals("1059_0", qids[1]);
- }
-
- @Test
- public void testHashFetchesForBinary() {
- URI[] urisOuter = {
- URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"),
- URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1")
- };
-
- URI[] urisInner = {
- URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_004_000111_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000123_00&fn=0"),
- URI.create("http://192.168.0.17:35385/?qid=query_20120726371_000_001_004_00134_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000155_00&fn=0"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000255_00&fn=1"),
- URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_001356_00&fn=1")
- };
-
- Schema schema1 = new Schema();
- schema1.addColumn("col1", Type.INT4);
- TableMeta meta1 = new TableMetaImpl(schema1, StoreType.CSV, Options.create());
- TableDesc desc1 = new TableDescImpl("table1", meta1, new Path("/"));
- TableDesc desc2 = new TableDescImpl("table2", meta1, new Path("/"));
-
- QueryBlock.FromTable table1 = new QueryBlock.FromTable(desc1);
- QueryBlock.FromTable table2 = new QueryBlock.FromTable(desc2);
- ScanNode scan1 = new ScanNode(table1);
- ScanNode scan2 = new ScanNode(table2);
-
- Map<ScanNode, List<URI>> uris = Maps.newHashMap();
- uris.put(scan1, Lists.newArrayList(urisOuter));
- uris.put(scan2, Lists.newArrayList(urisInner));
-
- Map<String, Map<ScanNode, List<URI>>> hashed = GlobalPlanner.hashFetches(uris);
- assertEquals(2, hashed.size());
- assertTrue(hashed.keySet().contains("0"));
- assertTrue(hashed.keySet().contains("1"));
-
- assertTrue(hashed.get("0").containsKey(scan1));
- assertTrue(hashed.get("0").containsKey(scan2));
-
- assertEquals(2, hashed.get("0").get(scan1).size());
- assertEquals(3, hashed.get("0").get(scan2).size());
-
- QueryStringDecoder decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(0));
- Map<String, List<String>> params = decoder.getParameters();
- String [] qids = params.get("qid").get(0).split(",");
- assertEquals(2, qids.length);
- assertEquals("1064_0", qids[0]);
- assertEquals("104_0", qids[1]);
-
- decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(1));
- params = decoder.getParameters();
- qids = params.get("qid").get(0).split(",");
- assertEquals(2, qids.length);
- assertEquals("835_0", qids[0]);
- assertEquals("1059_0", qids[1]);
- }
-
- @Test
- public void testCreateMultilevelGroupby()
- throws IOException, CloneNotSupportedException {
- PlanningContext context = analyzer.parse(
- "create table store1 as select age, sumtest(salary) from table0 group by age");
- LogicalNode plan = logicalPlanner.createPlan(context);
- plan = LogicalOptimizer.optimize(context, plan);
-
- MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
-
- ExecutionBlock second, first, mid;
- ScanNode secondScan, firstScan, midScan;
-
- second = globalPlan.getRoot();
- assertTrue(second.getScanNodes().length == 1);
-
- first = second.getChildBlock(second.getScanNodes()[0]);
-
- GroupbyNode firstGroupby, secondGroupby, midGroupby;
- secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode();
-
- Column[] originKeys = secondGroupby.getGroupingColumns();
- Column[] newKeys = new Column[2];
- newKeys[0] = new Column("age", Type.INT4);
- newKeys[1] = new Column("name", Type.TEXT);
-
- mid = planner.createMultilevelGroupby(first, newKeys);
- midGroupby = (GroupbyNode) mid.getStoreTableNode().getSubNode();
- firstGroupby = (GroupbyNode) first.getStoreTableNode().getSubNode();
-
- secondScan = second.getScanNodes()[0];
- midScan = mid.getScanNodes()[0];
- firstScan = first.getScanNodes()[0];
-
- assertTrue(first.getParentBlock().equals(mid));
- assertTrue(mid.getParentBlock().equals(second));
- assertTrue(second.getChildBlock(secondScan).equals(mid));
- assertTrue(mid.getChildBlock(midScan).equals(first));
- assertEquals(first.getOutputName(), midScan.getTableId());
- assertEquals(first.getOutputSchema(), midScan.getInSchema());
- assertEquals(mid.getOutputName(), secondScan.getTableId());
- assertEquals(mid.getOutputSchema(), secondScan.getOutSchema());
- assertArrayEquals(newKeys, firstGroupby.getGroupingColumns());
- assertArrayEquals(newKeys, midGroupby.getGroupingColumns());
- assertArrayEquals(originKeys, secondGroupby.getGroupingColumns());
- assertFalse(firstScan.isLocal());
- assertTrue(midScan.isLocal());
- assertTrue(secondScan.isLocal());
- }
}