You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/05/09 09:36:37 UTC

git commit: TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik)

Updated Branches:
  refs/heads/master 0652815f5 -> 2923f4e3b


TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2923f4e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2923f4e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2923f4e3

Branch: refs/heads/master
Commit: 2923f4e3b5812f25e73e26c2af0add4262c6a07e
Parents: 0652815
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 9 16:34:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 9 16:35:31 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 +
 .../src/main/java/tajo/master/GlobalPlanner.java   |  538 +--------------
 .../engine/plan/global/TestGlobalQueryPlanner.java |  143 ----
 3 files changed, 8 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cba4f8f..7dd10de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
    
+    TAJO-57: Recognize Parser and Catalog Standard SQL data types. (hyunsik)
+
     TAJO-33: Implement a basic query progress indicator. (hyunsik)
 
   IMPROVEMENTS
@@ -37,6 +39,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik)
+
     TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)
 
     TAJO-47: RowFile has the duplicated initialization problem and unflipped 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index 376f15d..2176b8e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -18,39 +18,28 @@
 
 package tajo.master;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import tajo.QueryId;
 import tajo.QueryIdFactory;
-import tajo.QueryUnitAttemptId;
 import tajo.SubQueryId;
 import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.common.exception.NotImplementedException;
 import tajo.conf.TajoConf;
 import tajo.engine.parser.QueryBlock.FromTable;
 import tajo.engine.planner.PlannerUtil;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.engine.planner.logical.*;
-import tajo.engine.utils.TupleUtil;
 import tajo.master.ExecutionBlock.PartitionType;
-import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
-import tajo.storage.TupleRange;
 import tajo.util.TajoIdUtils;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class GlobalPlanner {
   private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
@@ -690,75 +679,6 @@ public class GlobalPlanner {
           prevOutputType);
     }
   }
-
-  @VisibleForTesting
-  public ExecutionBlock createMultilevelGroupby(
-      ExecutionBlock firstPhaseGroupby, Column[] keys)
-      throws CloneNotSupportedException, IOException {
-    ExecutionBlock secondPhaseGroupby = firstPhaseGroupby.getParentBlock();
-    Preconditions.checkState(secondPhaseGroupby.getScanNodes().length == 1);
-
-    ScanNode secondScan = secondPhaseGroupby.getScanNodes()[0];
-    GroupbyNode secondGroupby = (GroupbyNode) secondPhaseGroupby.
-        getStoreTableNode().getSubNode();
-    ExecutionBlock newPhaseGroupby = new ExecutionBlock(
-        QueryIdFactory.newSubQueryId(firstPhaseGroupby.getId().getQueryId()));
-    LogicalNode tmp = PlannerUtil.findTopParentNode(
-        firstPhaseGroupby.getPlan(), ExprType.GROUP_BY);
-    GroupbyNode firstGroupby;
-    if (tmp instanceof UnaryNode) {
-      firstGroupby = (GroupbyNode) ((UnaryNode)tmp).getSubNode();
-      GroupbyNode newFirstGroupby = GlobalPlannerUtils.newGroupbyPlan(
-          firstGroupby.getInSchema(),
-          firstGroupby.getOutSchema(),
-          keys,
-          firstGroupby.getHavingCondition(),
-          firstGroupby.getTargets()
-      );
-      newFirstGroupby.setSubNode(firstGroupby.getSubNode());
-      ((UnaryNode) tmp).setSubNode(newFirstGroupby);
-    }
-
-    // create a new SubQuery containing the group by plan
-    StoreTableNode newStore = GlobalPlannerUtils.newStorePlan(
-        secondScan.getInSchema(),
-        newPhaseGroupby.getId().toString());
-    newStore.setLocal(true);
-    ScanNode newScan = GlobalPlannerUtils.newScanPlan(
-        firstPhaseGroupby.getOutputSchema(),
-        firstPhaseGroupby.getOutputName(),
-        sm.getTablePath(firstPhaseGroupby.getOutputName()));
-    newScan.setLocal(true);
-    GroupbyNode newGroupby = GlobalPlannerUtils.newGroupbyPlan(
-        newScan.getOutSchema(),
-        newStore.getInSchema(),
-        keys,
-        secondGroupby.getHavingCondition(),
-        secondGroupby.getTargets());
-    newGroupby.setSubNode(newScan);
-    newStore.setSubNode(newGroupby);
-    newPhaseGroupby.setPlan(newStore);
-
-    secondPhaseGroupby.removeChildBlock(secondScan);
-
-    // update the scan node of last phase
-    secondScan = GlobalPlannerUtils.newScanPlan(secondScan.getInSchema(),
-        newPhaseGroupby.getOutputName(),
-        sm.getTablePath(newPhaseGroupby.getOutputName()));
-    secondScan.setLocal(true);
-    secondGroupby.setSubNode(secondScan);
-    secondPhaseGroupby.setPlan(secondPhaseGroupby.getPlan());
-
-    // insert the new SubQuery
-    // between the first phase and the second phase
-    secondPhaseGroupby.addChildBlock(secondScan, newPhaseGroupby);
-    newPhaseGroupby.addChildBlock(newPhaseGroupby.getScanNodes()[0],
-        firstPhaseGroupby);
-    newPhaseGroupby.setParentBlock(secondPhaseGroupby);
-    firstPhaseGroupby.setParentBlock(newPhaseGroupby);
-
-    return newPhaseGroupby;
-  }
   
   private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
       TableMeta meta) throws IOException {
@@ -798,454 +718,4 @@ public class GlobalPlanner {
     }
     return new MasterPlan(root);
   }
-  
-  /**
-   * 2개의 scan을 가진 QueryUnit들에 fragment와 fetch를 할당
-   * 
-   * @param subQuery
-   * @param n
-   * @param fragMap
-   * @param fetchMap
-   * @return
-   */
-  private List<QueryUnit> makeBinaryQueryUnit(SubQuery subQuery, final int n,
-      Map<ScanNode, List<Fragment>> fragMap, 
-      Map<ScanNode, List<URI>> fetchMap) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    ScanNode[] scans = execBlock.getScanNodes();
-    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-    final int maxQueryUnitNum = n;
-    
-    if (execBlock.hasChildBlock()) {
-      ExecutionBlock prev = execBlock.getChildBlock(scans[0]);
-      switch (prev.getPartitionType()) {
-      case BROADCAST:
-        throw new NotImplementedException();
-      case HASH:
-        if (scans[0].isLocal()) {
-          queryUnits = assignFetchesToBinaryByHash(subQuery, queryUnits,
-              fetchMap, maxQueryUnitNum);
-          queryUnits = assignEqualFragment(queryUnits, fragMap);
-        } else {
-          throw new NotImplementedException();
-        }
-        break;
-      case LIST:
-        throw new NotImplementedException();
-      }
-    } else {
-      queryUnits = makeQueryUnitsForBinaryPlan(subQuery,
-          queryUnits, fragMap);
-    }
-
-    return queryUnits;
-  }
-
-  public List<QueryUnit> makeQueryUnitsForBinaryPlan(
-      SubQuery subQuery, List<QueryUnit> queryUnits,
-      Map<ScanNode, List<Fragment>> fragmentMap) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit queryUnit;
-    if (execBlock.hasJoin()) {
-      // make query units for every composition of fragments of each scan
-      Preconditions.checkArgument(fragmentMap.size()==2);
-
-      ScanNode [] scanNodes = execBlock.getScanNodes();
-      String innerId = null, outerId = null;
-
-      // If one relation is set to broadcast, it meaning that the relation
-      // is less than one block size. That is, the relation has only
-      // one fragment. If this assumption is kept, the below code is always
-      // correct.
-      if (scanNodes[0].isBroadcast() || scanNodes[1].isBroadcast()) {
-        List<Fragment> broadcastFrag = null;
-        List<Fragment> baseFrag = null;
-        if (scanNodes[0].isBroadcast()) {
-          broadcastFrag = fragmentMap.get(scanNodes[0]);
-          baseFrag = fragmentMap.get(scanNodes[1]);
-
-          innerId = scanNodes[0].getTableId();
-          outerId = scanNodes[1].getTableId();
-        } else if (scanNodes[1].isBroadcast()) {
-          broadcastFrag = fragmentMap.get(scanNodes[1]);
-          baseFrag = fragmentMap.get(scanNodes[0]);
-
-          innerId = scanNodes[1].getTableId();
-          outerId = scanNodes[0].getTableId();
-        }
-
-        for (Fragment outer : baseFrag) {
-          queryUnit = newQueryUnit(subQuery);
-          queryUnit.setFragment(outerId, outer);
-          for (Fragment inner : broadcastFrag) {
-            queryUnit.setFragment(innerId, inner);
-          }
-          queryUnits.add(queryUnit);
-        }
-      } else {
-        List<Fragment> innerFrags, outerFrags;
-        Iterator<Entry<ScanNode, List<Fragment>>> it =
-            fragmentMap.entrySet().iterator();
-        Entry<ScanNode, List<Fragment>> e = it.next();
-        innerId = e.getKey().getTableId();
-        innerFrags = e.getValue();
-        e = it.next();
-        outerId = e.getKey().getTableId();
-        outerFrags = e.getValue();
-        for (Fragment outer : outerFrags) {
-          for (Fragment inner : innerFrags) {
-            queryUnit = newQueryUnit(subQuery);
-            queryUnit.setFragment(innerId, inner);
-            queryUnit.setFragment(outerId, outer);
-            queryUnits.add(queryUnit);
-          }
-        }
-      }
-    }
-
-    return queryUnits;
-  }
-
-  private List<QueryUnit> makeQueryUnitsForEachFragment(
-      SubQuery subQuery, List<QueryUnit> queryUnits,
-      ScanNode scan, List<Fragment> fragments) {
-    QueryUnit queryUnit;
-    for (Fragment fragment : fragments) {
-      queryUnit = newQueryUnit(subQuery);
-      queryUnit.setFragment(scan.getTableId(), fragment);
-      queryUnits.add(queryUnit);
-    }
-    return queryUnits;
-  }
-  
-  private QueryUnit newQueryUnit(SubQuery subQuery) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit unit = new QueryUnit(
-        QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(),
-        subQuery.getEventHandler());
-    unit.setLogicalPlan(execBlock.getPlan());
-    return unit;
-  }
-  
-  /**
-   * Binary QueryUnit들에 hash 파티션된 fetch를 할당
-   * 
-   * @param subQuery
-   * @param unitList
-   * @param fetchMap
-   * @param n
-   * @return
-   */
-  private List<QueryUnit> assignFetchesToBinaryByHash(SubQuery subQuery,
-      List<QueryUnit> unitList, Map<ScanNode, List<URI>> fetchMap, final int n) {
-    QueryUnit unit;
-    int i = 0;
-    Map<String, Map<ScanNode, List<URI>>> hashed = hashFetches(fetchMap);
-    Iterator<Entry<String, Map<ScanNode, List<URI>>>> it = 
-        hashed.entrySet().iterator();
-    Entry<String, Map<ScanNode, List<URI>>> e;
-    while (it.hasNext()) {
-      e = it.next();
-      if (e.getValue().size() == 2) { // only if two matched partitions
-        if (unitList.size() == n) {
-          unit = unitList.get(i++);
-          if (i == unitList.size()) {
-            i = 0;
-          }
-        } else {
-          unit = newQueryUnit(subQuery);
-          unitList.add(unit);
-        }
-        Map<ScanNode, List<URI>> m = e.getValue();
-        for (ScanNode scan : m.keySet()) {
-          for (URI uri : m.get(scan)) {
-            unit.addFetch(scan.getTableId(), uri);
-          }
-        }
-      }
-    }
-    
-    return unitList;
-  }
-  
-  /**
-   * Unary QueryUnit들에 hash 파티션된 fetch를 할당
-   * 
-   * @param subQuery
-   * @param unitList
-   * @param scan
-   * @param uriList
-   * @param n
-   * @return
-   */
-  private List<QueryUnit> assignFetchesToUnaryByHash(SubQuery subQuery,
-      List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) {
-    Map<String, List<URI>> hashed = hashFetches(subQuery.getId(), uriList); // hash key, uri
-    QueryUnit unit;
-    int i = 0;
-    // TODO: units에 hashed 할당
-    Iterator<Entry<String, List<URI>>> it = hashed.entrySet().iterator();
-    Entry<String, List<URI>> e;
-    while (it.hasNext()) {
-      e = it.next();
-      if (unitList.size() == n) {
-        unit = unitList.get(i++);
-        if (i == unitList.size()) {
-          i = 0;
-        }
-      } else {
-        unit = newQueryUnit(subQuery);
-        unitList.add(unit);
-      }
-      unit.addFetches(scan.getTableId(), e.getValue());
-    }
-    return unitList;
-  }
-
-  private List<QueryUnit> assignFetchesByRange(SubQuery subQuery,
-                                              List<QueryUnit> unitList,
-                                              ScanNode scan,
-                                              List<URI> uriList,
-                                              int n,
-                                              Schema rangeSchema,
-                                              boolean ascendingFirstKey)
-      throws UnsupportedEncodingException {
-    Map<TupleRange, Set<URI>> partitions =
-        rangeFetches(rangeSchema, uriList, ascendingFirstKey);
-    // grouping urls by range
-    QueryUnit unit;
-    int i = 0;
-    Iterator<Entry<TupleRange, Set<URI>>> it = partitions.entrySet().iterator();
-    Entry<TupleRange, Set<URI>> e;
-    while (it.hasNext()) {
-      e = it.next();
-      if (unitList.size() == n) {
-        unit = unitList.get(i++);
-        if (i == unitList.size()) {
-          i = 0;
-        }
-      } else {
-        unit = newQueryUnit(subQuery);
-        unitList.add(unit);
-      }
-      unit.addFetches(scan.getTableId(), e.getValue());
-    }
-    return unitList;
-  }
-  
-  /**
-   * Unary QueryUnit들에 list 파티션된 fetch를 할당
-   * 
-   * @param subQuery
-   * @param unitList
-   * @param scan
-   * @param uriList
-   * @param n
-   * @return
-   */
-  private List<QueryUnit> assignFetchesByRoundRobin(SubQuery subQuery,
-      List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) { 
-    QueryUnit unit;
-    int i = 0;
-    for (URI uri : uriList) {
-      if (unitList.size() < n) {
-        unit = newQueryUnit(subQuery);
-        unitList.add(unit);
-      } else {
-        unit = unitList.get(i++);
-        if (i == unitList.size()) {
-          i = 0;
-        }
-      }
-      unit.addFetch(scan.getTableId(), uri);
-    }
-    return unitList;
-  }
-
-  @VisibleForTesting
-  public static Map<String, Map<ScanNode, List<URI>>> hashFetches(Map<ScanNode, List<URI>> uriMap) {
-    SortedMap<String, Map<ScanNode, List<URI>>> hashed =
-        new TreeMap<String, Map<ScanNode, List<URI>>>();
-    String uriPath, key;
-    Map<ScanNode, List<URI>> m;
-    List<URI> uriList;
-    for (Entry<ScanNode, List<URI>> e : uriMap.entrySet()) {
-      for (URI uri : e.getValue()) {
-        uriPath = uri.toString();
-        key = uriPath.substring(uriPath.lastIndexOf("=")+1);
-        if (hashed.containsKey(key)) {
-          m = hashed.get(key);
-        } else {
-          m = new HashMap<ScanNode, List<URI>>();
-        }
-        if (m.containsKey(e.getKey())) {
-          uriList = m.get(e.getKey());
-        } else {
-          uriList = new ArrayList<URI>();
-        }
-        uriList.add(uri);
-        m.put(e.getKey(), uriList);
-        hashed.put(key, m);
-      }
-    }
-
-    SortedMap<String, Map<ScanNode, List<URI>>> finalHashed = new TreeMap<String, Map<ScanNode,List<URI>>>();
-    for (Entry<String, Map<ScanNode, List<URI>>> entry : hashed.entrySet()) {
-      finalHashed.put(entry.getKey(), combineURIByHostForBinary(entry.getValue()));
-    }
-    
-    return finalHashed;
-  }
-
-  private static Map<ScanNode, List<URI>> combineURIByHostForBinary(Map<ScanNode, List<URI>> hashed) {
-    Map<ScanNode, List<URI>> finalHashed = Maps.newHashMap();
-    for (Entry<ScanNode, List<URI>> urisByKey : hashed.entrySet()) {
-      Map<String, List<String>> param = new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters();
-      QueryUnitAttemptId quid = new QueryUnitAttemptId(
-          new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0));
-      SubQueryId sid = quid.getSubQueryId();
-      String fn = param.get("fn").get(0);
-      Map<String, List<String>> quidByHost = Maps.newHashMap();
-      for(URI uri : urisByKey.getValue()) {
-        final Map<String,List<String>> params =
-            new QueryStringDecoder(uri).getParameters();
-        if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) {
-          quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0));
-        } else {
-          quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid")));
-        }
-      }
-
-      finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), fn));
-    }
-
-    return finalHashed;
-  }
-
-
-  @VisibleForTesting
-  public static Map<String, List<URI>> hashFetches(SubQueryId sid, List<URI> uriList) {
-    SortedMap<String, List<URI>> hashed = new TreeMap<String, List<URI>>();
-    String uriPath, key;
-    for (URI uri : uriList) {
-      // TODO
-      uriPath = uri.toString();
-      key = uriPath.substring(uriPath.lastIndexOf("=")+1);
-      if (hashed.containsKey(key)) {
-        hashed.get(key).add(uri);
-      } else {
-        List<URI> list = new ArrayList<URI>();
-        list.add(uri);
-        hashed.put(key, list);
-      }
-    }
-    
-    return combineURIByHost(hashed);
-  }
-
-  private static Map<String, List<URI>> combineURIByHost(Map<String, List<URI>> hashed) {
-    Map<String, List<URI>> finalHashed = Maps.newTreeMap();
-    for (Entry<String, List<URI>> urisByKey : hashed.entrySet()) {
-      QueryUnitAttemptId quid = new QueryUnitAttemptId(
-          new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0));
-      SubQueryId sid = quid.getSubQueryId();
-      Map<String,List<String>> quidByHost = Maps.newHashMap();
-      for(URI uri : urisByKey.getValue()) {
-        final Map<String,List<String>> params =
-            new QueryStringDecoder(uri).getParameters();
-        if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) {
-          quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0));
-        } else {
-          quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid")));
-        }
-      }
-      finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), urisByKey.getKey()));
-    }
-    return finalHashed;
-  }
-
-  private static List<URI> mergeURI(Map<String, List<String>> quidByKey, String sid, String fn) {
-    List<URI> uris = Lists.newArrayList();
-    for (Entry<String, List<String>> quidByHost : quidByKey.entrySet()) {
-      StringBuilder sb = new StringBuilder("http://")
-          .append(quidByHost.getKey()).append("/").append("?fn="+fn).append("&sid="+sid);
-      sb.append("&qid=");
-      boolean first = true;
-      for (String qid : quidByHost.getValue()) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(",");
-        }
-
-        QueryUnitAttemptId quid = new QueryUnitAttemptId(qid);
-        sb.append(quid.getQueryUnitId().getId() + "_");
-        sb.append(quid.getId());
-      }
-      uris.add(URI.create(sb.toString()));
-    }
-    return uris;
-  }
-
-  private Map<TupleRange, Set<URI>> rangeFetches(final Schema schema,
-                                                 final List<URI> uriList,
-                                                 final boolean ascendingFirstKey)
-      throws UnsupportedEncodingException {
-    SortedMap<TupleRange, Set<URI>> map;
-    if (ascendingFirstKey) {
-      map = new TreeMap<TupleRange, Set<URI>>();
-    } else {
-      map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
-    }
-    TupleRange range;
-    Set<URI> uris;
-    for (URI uri : uriList) {
-      // URI.getQuery() returns a url-decoded query string.
-      range = TupleUtil.queryToRange(schema, uri.getQuery());
-      if (map.containsKey(range)) {
-        uris = map.get(range);
-        uris.add(uri);
-      } else {
-        uris = Sets.newHashSet();
-        uris.add(uri);
-        map.put(range, uris);
-      }
-    }
-
-    return map;
-  }
-
-  /**
-   * Unary QueryUnit들에 대하여 동일한 fragment를 할당
-   * 
-   * @param unitList
-   * @param scan
-   * @param frag
-   * @return
-   */
-  private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList, 
-      ScanNode scan, Fragment frag) {
-    for (int i = 0; i < unitList.size(); i++) {
-      unitList.get(i).setFragment(scan.getTableId(), frag);
-    }
-    
-    return unitList;
-  }
-  
-  /**
-   * Binary QueryUnit들에 대하여 scan별로 동일한 fragment를 할당
-   * @param unitList
-   * @param fragMap
-   * @return
-   */
-  private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList, 
-      Map<ScanNode, List<Fragment>> fragMap) {
-    for (int i = 0; i < unitList.size(); i++) {
-      for (ScanNode scan : fragMap.keySet()) {
-        unitList.get(i).setFragment(scan.getTableId(),
-            fragMap.get(scan).get(0));
-      }
-    }
-    return unitList;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index ebd9d07..efe0562 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -18,13 +18,10 @@
 
 package tajo.engine.plan.global;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,7 +37,6 @@ import tajo.datum.Datum;
 import tajo.datum.DatumFactory;
 import tajo.engine.eval.TestEvalTree.TestSum;
 import tajo.engine.parser.QueryAnalyzer;
-import tajo.engine.parser.QueryBlock;
 import tajo.engine.planner.LogicalOptimizer;
 import tajo.engine.planner.LogicalPlanner;
 import tajo.engine.planner.PlanningContext;
@@ -53,10 +49,7 @@ import tajo.master.TajoMaster;
 import tajo.storage.*;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 import static org.junit.Assert.*;
 
@@ -389,140 +382,4 @@ public class TestGlobalQueryPlanner {
       assertEquals(ExprType.SCAN, groupby.getSubNode().getType());
     }
   }
-
-  @Test
-  public void testHashFetches() {
-    URI[] uris = {
-        URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"),
-        URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1")
-    };
-
-    Map<String, List<URI>> hashed = GlobalPlanner.hashFetches(null, Lists.newArrayList(uris));
-    assertEquals(2, hashed.size());
-    List<URI> res = hashed.get("0");
-    assertEquals(2, res.size());
-    res = hashed.get("1");
-    assertEquals(1, res.size());
-    QueryStringDecoder decoder = new QueryStringDecoder(res.get(0));
-    Map<String, List<String>> params = decoder.getParameters();
-    String [] qids = params.get("qid").get(0).split(",");
-    assertEquals(2, qids.length);
-    assertEquals("104_0", qids[0]);
-    assertEquals("1059_0", qids[1]);
-  }
-
-  @Test
-  public void testHashFetchesForBinary() {
-    URI[] urisOuter = {
-        URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"),
-        URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1")
-    };
-
-    URI[] urisInner = {
-        URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_004_000111_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000123_00&fn=0"),
-        URI.create("http://192.168.0.17:35385/?qid=query_20120726371_000_001_004_00134_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000155_00&fn=0"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000255_00&fn=1"),
-        URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_001356_00&fn=1")
-    };
-
-    Schema schema1 = new Schema();
-    schema1.addColumn("col1", Type.INT4);
-    TableMeta meta1 = new TableMetaImpl(schema1, StoreType.CSV, Options.create());
-    TableDesc desc1 = new TableDescImpl("table1", meta1, new Path("/"));
-    TableDesc desc2 = new TableDescImpl("table2", meta1, new Path("/"));
-
-    QueryBlock.FromTable table1 = new QueryBlock.FromTable(desc1);
-    QueryBlock.FromTable table2 = new QueryBlock.FromTable(desc2);
-    ScanNode scan1 = new ScanNode(table1);
-    ScanNode scan2 = new ScanNode(table2);
-
-    Map<ScanNode, List<URI>> uris = Maps.newHashMap();
-    uris.put(scan1, Lists.newArrayList(urisOuter));
-    uris.put(scan2, Lists.newArrayList(urisInner));
-
-    Map<String, Map<ScanNode, List<URI>>> hashed = GlobalPlanner.hashFetches(uris);
-    assertEquals(2, hashed.size());
-    assertTrue(hashed.keySet().contains("0"));
-    assertTrue(hashed.keySet().contains("1"));
-
-    assertTrue(hashed.get("0").containsKey(scan1));
-    assertTrue(hashed.get("0").containsKey(scan2));
-
-    assertEquals(2, hashed.get("0").get(scan1).size());
-    assertEquals(3, hashed.get("0").get(scan2).size());
-
-    QueryStringDecoder decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(0));
-    Map<String, List<String>> params = decoder.getParameters();
-    String [] qids = params.get("qid").get(0).split(",");
-    assertEquals(2, qids.length);
-    assertEquals("1064_0", qids[0]);
-    assertEquals("104_0", qids[1]);
-
-    decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(1));
-    params = decoder.getParameters();
-    qids = params.get("qid").get(0).split(",");
-    assertEquals(2, qids.length);
-    assertEquals("835_0", qids[0]);
-    assertEquals("1059_0", qids[1]);
-  }
-
-  @Test
-  public void testCreateMultilevelGroupby()
-      throws IOException, CloneNotSupportedException {
-    PlanningContext context = analyzer.parse(
-        "create table store1 as select age, sumtest(salary) from table0 group by age");
-    LogicalNode plan = logicalPlanner.createPlan(context);
-    plan = LogicalOptimizer.optimize(context, plan);
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
-
-    ExecutionBlock second, first, mid;
-    ScanNode secondScan, firstScan, midScan;
-
-    second = globalPlan.getRoot();
-    assertTrue(second.getScanNodes().length == 1);
-
-    first = second.getChildBlock(second.getScanNodes()[0]);
-
-    GroupbyNode firstGroupby, secondGroupby, midGroupby;
-    secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode();
-
-    Column[] originKeys = secondGroupby.getGroupingColumns();
-    Column[] newKeys = new Column[2];
-    newKeys[0] = new Column("age", Type.INT4);
-    newKeys[1] = new Column("name", Type.TEXT);
-
-    mid = planner.createMultilevelGroupby(first, newKeys);
-    midGroupby = (GroupbyNode) mid.getStoreTableNode().getSubNode();
-    firstGroupby = (GroupbyNode) first.getStoreTableNode().getSubNode();
-
-    secondScan = second.getScanNodes()[0];
-    midScan = mid.getScanNodes()[0];
-    firstScan = first.getScanNodes()[0];
-
-    assertTrue(first.getParentBlock().equals(mid));
-    assertTrue(mid.getParentBlock().equals(second));
-    assertTrue(second.getChildBlock(secondScan).equals(mid));
-    assertTrue(mid.getChildBlock(midScan).equals(first));
-    assertEquals(first.getOutputName(), midScan.getTableId());
-    assertEquals(first.getOutputSchema(), midScan.getInSchema());
-    assertEquals(mid.getOutputName(), secondScan.getTableId());
-    assertEquals(mid.getOutputSchema(), secondScan.getOutSchema());
-    assertArrayEquals(newKeys, firstGroupby.getGroupingColumns());
-    assertArrayEquals(newKeys, midGroupby.getGroupingColumns());
-    assertArrayEquals(originKeys, secondGroupby.getGroupingColumns());
-    assertFalse(firstScan.isLocal());
-    assertTrue(midScan.isLocal());
-    assertTrue(secondScan.isLocal());
-  }
 }