You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/10 17:48:27 UTC

[2/2] git commit: TAJO-584: Improve distributed merge sort.

TAJO-584: Improve distributed merge sort.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/214b9741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/214b9741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/214b9741

Branch: refs/heads/master
Commit: 214b9741a510d1c2013e0dd494ab66017962367a
Parents: 4179a7c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Feb 11 01:48:15 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Feb 11 01:48:15 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../exception/AlreadyExistsTableException.java  |   2 +-
 .../org/apache/tajo/jdbc/TajoResultSet.java     |  24 +-
 .../tajo/engine/planner/BaseAlgebraVisitor.java |  17 +-
 .../apache/tajo/engine/planner/LogicalPlan.java |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 149 ++++++---
 .../apache/tajo/engine/planner/PlannerUtil.java |  10 +
 .../engine/planner/PreLogicalPlanVerifier.java  |  33 +-
 .../engine/planner/RangePartitionAlgorithm.java |  86 +++--
 .../engine/planner/UniformRangePartition.java   | 160 +++++++---
 .../engine/planner/global/GlobalPlanner.java    |   2 +
 .../planner/physical/ExternalSortExec.java      |  77 +++--
 .../planner/physical/UnaryPhysicalExec.java     |  12 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java | 316 +------------------
 .../tajo/master/querymaster/Repartitioner.java  |   8 +-
 .../tajo/worker/RangeRetrieverHandler.java      |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../planner/TestUniformRangePartition.java      |  68 ++--
 .../planner/physical/TestBSTIndexExec.java      |   3 +-
 .../planner/physical/TestExternalSortExec.java  |   6 +-
 .../physical/TestLeftOuterHashJoinExec.java     |   4 +
 .../physical/TestLeftOuterNLJoinExec.java       |  21 +-
 .../planner/physical/TestPhysicalPlanner.java   |   3 +
 .../physical/TestRightOuterHashJoinExec.java    |   5 +-
 .../engine/planner/physical/TestSortExec.java   |   6 +-
 .../apache/tajo/engine/query/TestSortQuery.java |  10 +
 .../apache/tajo/engine/util/TestTupleUtil.java  | 126 +-------
 .../tajo/worker/TestRangeRetrieverHandler.java  |  12 +-
 .../resources/dataset/TestSortQuery/table2.tbl  |  24 ++
 .../org/apache/tajo/jdbc/TestTajoResultSet.java |  63 ++++
 .../TestJoinQuery/testOuterJoinAndCaseWhen1.sql |   5 +-
 .../create_table_with_asc_desc_keys.sql         |   1 +
 .../TestSortQuery/testSortWithAscDescKeys.sql   |   1 +
 .../tajo/pullserver/PullServerAuxService.java   |   9 +-
 .../tajo/pullserver/TajoPullServerService.java  |  18 +-
 .../org/apache/tajo/storage/MergeScanner.java   |  22 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  24 +-
 .../apache/tajo/storage/TupleComparator.java    |   4 +
 .../org/apache/tajo/storage/TupleRange.java     |  26 +-
 39 files changed, 680 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8dc5ee5..d79ec91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-584: Improve distributed merge sort. (hyunsik)
+
     TAJO-36: Improve ExternalSortExec with N-merge sort and final pass
     omission. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
index 98c04b5..a2e3a6a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
@@ -26,6 +26,6 @@ public class AlreadyExistsTableException extends CatalogException {
 	}
 
 	public AlreadyExistsTableException(String tableName) {
-		super("Already Exists Table: "+tableName);
+		super("Already exists table: "+tableName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 3977d76..942107c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.jdbc;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.FileScanner;
@@ -37,7 +37,6 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -63,7 +62,7 @@ public class TajoResultSet extends TajoResultSetBase {
       fs = FileScanner.getFileSystem(conf, desc.getPath());
       this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
 
-      Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+      List<FileFragment> frags = getFragments(desc.getPath());
       scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
     }
     init();
@@ -75,23 +74,30 @@ public class TajoResultSet extends TajoResultSetBase {
     curRow = 0;
   }
 
-  static class FileNameComparator implements Comparator<FileStatus> {
+  public static class FileNameComparator implements Comparator<FileStatus> {
 
     @Override
     public int compare(FileStatus f1, FileStatus f2) {
-      return f2.getPath().getName().compareTo(f1.getPath().getName());
+      return f1.getPath().getName().compareTo(f2.getPath().getName());
     }
   }
 
-  private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
+  private List<FileFragment> getFragments(Path tablePath)
       throws IOException {
-    List<FileFragment> fraglist = Lists.newArrayList();
+    List<FileFragment> fragments = Lists.newArrayList();
     FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
       @Override
       public boolean accept(Path path) {
         return path.getName().charAt(0) != '.';
       }
     });
+
+
+    // The files must be sorted in an ascending order of file names
+    // in order to guarantee the order of a sort operation.
+    // This is because our distributed sort algorithm outputs
+    // a sequence of sorted data files, each of which contains sorted rows
+    // within each file.
     Arrays.sort(files, new FileNameComparator());
 
     String tbname = tablePath.getName();
@@ -99,9 +105,9 @@ public class TajoResultSet extends TajoResultSetBase {
       if (files[i].getLen() == 0) {
         continue;
       }
-      fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+      fragments.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
     }
-    return fraglist;
+    return ImmutableList.copyOf(fragments);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 6cc6fd0..25ee316 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -269,14 +269,19 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
   @Override
   public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
     stack.push(expr);
-    if (!expr.isAllProjected()) {
-      for (NamedExpr target : expr.getNamedExprs()) {
-        visit(ctx, stack, target);
+    try {
+      if (!expr.isAllProjected()) {
+        for (NamedExpr target : expr.getNamedExprs()) {
+          visit(ctx, stack, target);
+        }
       }
+      if (expr.hasChild()) {
+        return visit(ctx, stack, expr.getChild());
+      }
+    } finally {
+      stack.pop();
     }
-    RESULT result = visit(ctx, stack, expr.getChild());
-    stack.pop();
-    return result;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 6afaaad..9890943 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -245,7 +245,7 @@ public class LogicalPlan {
 
       // Trying to find the column within the current block
 
-      if (block.currentNode != null) {
+      if (block.currentNode != null && block.currentNode.getInSchema() != null) {
         Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName());
         if (found != null) {
           return found;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index dfa2e40..5583efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -35,6 +35,7 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -45,6 +46,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Stack;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
@@ -73,7 +75,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     PhysicalExec execPlan;
 
     try {
-      execPlan = createPlanRecursive(context, logicalPlan);
+      execPlan = createPlanRecursive(context, logicalPlan, new Stack<LogicalNode>());
       if (execPlan instanceof StoreTableExec
           || execPlan instanceof RangeShuffleFileWriteExec
           || execPlan instanceof HashShuffleFileWriteExec
@@ -103,7 +105,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return outExecPlan;
   }
 
-  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack)
+      throws IOException {
     PhysicalExec leftExec;
     PhysicalExec rightExec;
 
@@ -111,7 +114,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
       case ROOT:
         LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
-        return createPlanRecursive(ctx, rootNode.getChild());
+        stack.push(rootNode);
+        leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack);
+        stack.pop();
+        return leftExec;
 
       case EXPRS:
         EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,61 +127,81 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       case INSERT:
       case STORE:
         StoreTableNode storeNode = (StoreTableNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, storeNode.getChild());
+        stack.push(storeNode);
+        leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack);
+        stack.pop();
         return createStorePlan(ctx, storeNode, leftExec);
 
       case SELECTION:
         SelectionNode selNode = (SelectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, selNode.getChild());
+        stack.push(selNode);
+        leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
+        stack.pop();
         return new SelectionExec(ctx, selNode, leftExec);
 
       case PROJECTION:
         ProjectionNode prjNode = (ProjectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, prjNode.getChild());
+        stack.push(prjNode);
+        leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
+        stack.pop();
         return new ProjectionExec(ctx, prjNode, leftExec);
 
       case TABLE_SUBQUERY: {
         TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
-        ProjectionExec projectionExec = new ProjectionExec(ctx, subQueryNode, leftExec);
-        return projectionExec;
+        stack.push(subQueryNode);
+        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack);
+        stack.pop();
+        return new ProjectionExec(ctx, subQueryNode, leftExec);
+
       }
 
       case PARTITIONS_SCAN:
       case SCAN:
-        leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
+        leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack);
         return leftExec;
 
       case GROUP_BY:
         GroupbyNode grpNode = (GroupbyNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, grpNode.getChild());
+        stack.push(grpNode);
+        leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack);
+        stack.pop();
         return createGroupByPlan(ctx, grpNode, leftExec);
 
       case HAVING:
         HavingNode havingNode = (HavingNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, havingNode.getChild());
+        stack.push(havingNode);
+        leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack);
+        stack.pop();
         return new HavingExec(ctx, havingNode, leftExec);
 
       case SORT:
         SortNode sortNode = (SortNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, sortNode.getChild());
+        stack.push(sortNode);
+        leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack);
+        stack.pop();
         return createSortPlan(ctx, sortNode, leftExec);
 
       case JOIN:
         JoinNode joinNode = (JoinNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
+        stack.push(joinNode);
+        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
+        stack.pop();
         return createJoinPlan(ctx, joinNode, leftExec, rightExec);
 
       case UNION:
         UnionNode unionNode = (UnionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
+        stack.push(unionNode);
+        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack);
+        stack.pop();
         return new UnionExec(ctx, leftExec, rightExec);
 
       case LIMIT:
         LimitNode limitNode = (LimitNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, limitNode.getChild());
+        stack.push(limitNode);
+        leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack);
+        stack.pop();
         return new LimitExec(ctx, limitNode.getInSchema(),
             limitNode.getOutSchema(), leftExec, limitNode);
 
@@ -745,41 +771,65 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec);
   }
 
-  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
-    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
-        "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
-
+  private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) {
     Enforcer enforcer = ctx.getEnforcer();
+    List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
+    if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
+      SortNode sortNode = (SortNode) node.peek();
+      TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
+
+      boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
+      SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList());
+      return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
+    } else {
+      return false;
+    }
+  }
 
-    // check if this table is broadcasted one or not.
-    boolean broadcastFlag = false;
-    if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
-      List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
-      for (EnforceProperty property : properties) {
-        broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+      throws IOException {
+    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
+        "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");    
+
+    // check if an input is sorted in the same order to the subsequence sort operator.
+    // TODO - it works only if input files are raw files. We should check the file format.
+    // Since the default intermediate file format is raw file, it is not problem right now.
+    if (checkIfSortEquivalance(ctx, scanNode, node)) {
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments);
+    } else {
+      Enforcer enforcer = ctx.getEnforcer();
+
+      // check if this table is broadcasted one or not.
+      boolean broadcastFlag = false;
+      if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+        List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+        for (EnforceProperty property : properties) {
+          broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+        }
       }
-    }
 
-    if (scanNode instanceof PartitionedTableScanNode
-        && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
-        ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+      if (scanNode instanceof PartitionedTableScanNode
+          && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+          ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
 
-      if (scanNode instanceof PartitionedTableScanNode) {
-        if (broadcastFlag) {
-          PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
-          List<FileFragment> fileFragments = TUtil.newList();
-          for (Path path : partitionedTableScanNode.getInputPaths()) {
-            fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
-          }
+        if (scanNode instanceof PartitionedTableScanNode) {
+          if (broadcastFlag) {
+            PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+            List<FileFragment> fileFragments = TUtil.newList();
+            for (Path path : partitionedTableScanNode.getInputPaths()) {
+              fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+            }
 
-          return new PartitionMergeScanExec(ctx, sm, scanNode,
-              FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+            return new PartitionMergeScanExec(ctx, sm, scanNode,
+                FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+          }
         }
       }
-    }
 
-    FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
-    return new SeqScanExec(ctx, sm, scanNode, fragments);
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new SeqScanExec(ctx, sm, scanNode, fragments);
+    }
   }
 
   public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -858,6 +908,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
+
+    // check if it is a distributed merge sort
+    // If so, it does need to create a sort executor because
+    // the sort executor is created at the scan planning
+    if (child instanceof SortExec) {
+      SortExec childSortExec = (SortExec) child;
+      if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) {
+        return child;
+      }
+    }
+
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
     if (property != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index b59cdda..9ada2ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -30,6 +30,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
@@ -749,4 +750,13 @@ public class PlannerUtil {
     }
     return names;
   }
+
+  public static SortSpec [] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
+    SortSpec [] sortSpecs = new SortSpec[sortSpecProtos.size()];
+    int i = 0;
+    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+      sortSpecs[i++] = new SortSpec(proto);
+    }
+    return sortSpecs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 1843b5a..fe93f46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -58,11 +58,11 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
 
   @Override
   public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
-    checkRelationExistence(state, expr.getName());
+    assertRelationExistence(state, expr.getName());
     return expr;
   }
 
-  private boolean checkRelationExistence(VerificationState state, String name) {
+  private boolean assertRelationExistence(VerificationState state, String name) {
     if (!catalog.existsTable(name)) {
       state.addVerification(String.format("relation \"%s\" does not exist", name));
       return false;
@@ -70,15 +70,34 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
     return true;
   }
 
+  private boolean assertRelationNoExistence(VerificationState state, String name) {
+    if (catalog.existsTable(name)) {
+      state.addVerification(String.format("relation \"%s\" already exists", name));
+      return false;
+    }
+    return true;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitCreateTable(VerificationState state, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    super.visitCreateTable(state, stack, expr);
+    assertRelationNoExistence(state, expr.getTableName());
+    return expr;
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Insert or Update Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public Expr visitInsert(VerificationState context, Stack<Expr> stack, Insert expr) throws PlanningException {
-    Expr child = super.visitInsert(context, stack, expr);
+  public Expr visitInsert(VerificationState state, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(state, stack, expr);
 
     if (expr.hasTableName()) {
-      checkRelationExistence(context, expr.getTableName());
+      assertRelationExistence(state, expr.getTableName());
     }
 
     if (child != null && child.getType() == OpType.Projection) {
@@ -88,9 +107,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
         int targetColumnNum = expr.getTargetColumns().length;
 
         if (targetColumnNum > projectColumnNum)  {
-          context.addVerification("INSERT has more target columns than expressions");
+          state.addVerification("INSERT has more target columns than expressions");
         } else if (targetColumnNum < projectColumnNum) {
-          context.addVerification("INSERT has more expressions than target columns");
+          state.addVerification("INSERT has more expressions than target columns");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index fc56b55..5bff857 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.Tuple;
@@ -28,7 +28,7 @@ import org.apache.tajo.storage.TupleRange;
 import java.math.BigDecimal;
 
 public abstract class RangePartitionAlgorithm {
-  protected Schema schema;
+  protected SortSpec [] sortSpecs;
   protected TupleRange range;
   protected final BigDecimal totalCard;
   /** true if the end of the range is inclusive. Otherwise, it should be false. */
@@ -36,15 +36,15 @@ public abstract class RangePartitionAlgorithm {
 
   /**
    *
-   * @param schema the schema of the range tuples
-   * @param range range to be partition
+   * @param sortSpecs The array of sort keys
+   * @param totalRange The total range to be partition
    * @param inclusive true if the end of the range is inclusive. Otherwise, false.
    */
-  public RangePartitionAlgorithm(Schema schema, TupleRange range, boolean inclusive) {
-    this.schema = schema;
-    this.range = range;
+  public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
+    this.sortSpecs = sortSpecs;
+    this.range = totalRange;
     this.inclusive = inclusive;
-    this.totalCard = computeCardinalityForAllColumns(schema, range, inclusive);
+    this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
   }
 
   /**
@@ -56,7 +56,7 @@ public abstract class RangePartitionAlgorithm {
    * @return
    */
   public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
-                                              boolean inclusive) {
+                                              boolean inclusive, boolean isAscending) {
     BigDecimal columnCard;
 
     switch (dataType.getType()) {
@@ -64,35 +64,75 @@ public abstract class RangePartitionAlgorithm {
         columnCard = new BigDecimal(2);
         break;
       case CHAR:
-        columnCard = new BigDecimal(end.asChar() - start.asChar());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asChar() - start.asChar());
+        } else {
+          columnCard = new BigDecimal(start.asChar() - end.asChar());
+        }
         break;
       case BIT:
-        columnCard = new BigDecimal(end.asByte() - start.asByte());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asByte() - start.asByte());
+        } else {
+          columnCard = new BigDecimal(start.asByte() - end.asByte());
+        }
         break;
       case INT2:
-        columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        } else {
+          columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+        }
         break;
       case INT4:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case INT8:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       case FLOAT4:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case FLOAT8:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       case TEXT:
-        columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+        } else {
+          columnCard = new BigDecimal(start.asChars().charAt(0) - end.asChars().charAt(0));
+        }
         break;
       case DATE:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case TIME:
       case TIMESTAMP:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       default:
         throw new UnsupportedOperationException(dataType + " is not supported yet");
@@ -105,16 +145,16 @@ public abstract class RangePartitionAlgorithm {
    * It computes the value cardinality of a tuple range.
    * @return
    */
-  public static BigDecimal computeCardinalityForAllColumns(Schema schema, TupleRange range, boolean inclusive) {
+  public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
     Tuple start = range.getStart();
     Tuple end = range.getEnd();
     Column col;
 
     BigDecimal cardinality = new BigDecimal(1);
     BigDecimal columnCard;
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      columnCard = computeCardinality(col.getDataType(), start.get(i), end.get(i), inclusive);
+    for (int i = 0; i < sortSpecs.length; i++) {
+      columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+          sortSpecs[i].isAscending());
 
       if (new BigDecimal(0).compareTo(columnCard) < 0) {
         cardinality = cardinality.multiply(columnCard);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 4f18c95..948b19e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.exception.RangeOverflowException;
@@ -33,6 +33,7 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.List;
 
+
 public class UniformRangePartition extends RangePartitionAlgorithm {
   private int variableId;
   private BigDecimal[] cardForEachDigit;
@@ -40,16 +41,16 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
 
   /**
    *
-   * @param schema
-   * @param range
+   * @param totalRange
+   * @param sortSpecs The description of sort keys
    * @param inclusive true if the end of the range is inclusive
    */
-  public UniformRangePartition(Schema schema, TupleRange range, boolean inclusive) {
-    super(schema, range, inclusive);
-    colCards = new BigDecimal[schema.getColumnNum()];
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      colCards[i] = computeCardinality(schema.getColumn(i).getDataType(), range.getStart().get(i),
-          range.getEnd().get(i), inclusive);
+  public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+    super(sortSpecs, totalRange, inclusive);
+    colCards = new BigDecimal[sortSpecs.length];
+    for (int i = 0; i < sortSpecs.length; i++) {
+      colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
+          totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
     }
 
     cardForEachDigit = new BigDecimal[colCards.length];
@@ -62,15 +63,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     }
   }
 
-  public UniformRangePartition(Schema schema, TupleRange range) {
-    this(schema, range, true);
+  public UniformRangePartition(TupleRange range, SortSpec [] sortSpecs) {
+    this(range, sortSpecs, true);
   }
 
   @Override
   public TupleRange[] partition(int partNum) {
     Preconditions.checkArgument(partNum > 0,
         "The number of partitions must be positive, but the given number: "
-        + partNum);
+            + partNum);
     Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
         "the number of partition cannot exceed total cardinality (" + totalCard + ")");
 
@@ -97,10 +98,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     Tuple last = range.getStart();
     while(reminder.compareTo(new BigDecimal(0)) > 0) {
       if (reminder.compareTo(term) <= 0) { // final one is inclusive
-        ranges.add(new TupleRange(schema, last, range.getEnd()));
+        ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
       } else {
         Tuple next = increment(last, term.longValue(), variableId);
-        ranges.add(new TupleRange(schema, last, next));
+        ranges.add(new TupleRange(sortSpecs, last, next));
       }
       last = ranges.get(ranges.size() - 1).getEnd();
       reminder = reminder.subtract(term);
@@ -109,49 +110,103 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     return ranges.toArray(new TupleRange[ranges.size()]);
   }
 
-  public boolean isOverflow(int colId, Datum last, BigDecimal inc) {
-    Column column = schema.getColumn(colId);
+  /**
+  *  Check whether an overflow occurs or not.
+   *
+   * @param colId The column id to be checked
+   * @param last
+   * @param inc
+   * @param sortSpecs
+   * @return
+   */
+  public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+    Column column = sortSpecs[colId].getSortKey();
     BigDecimal candidate;
     boolean overflow = false;
+
     switch (column.getDataType().getType()) {
       case BIT: {
-        candidate = inc.add(new BigDecimal(last.asByte()));
-        return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asByte()));
+          return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asByte()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+        }
       }
       case CHAR: {
-        candidate = inc.add(new BigDecimal((int)last.asChar()));
-        return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)last.asChar()));
+          return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)last.asChar()).subtract(inc);
+          return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+        }
       }
       case INT2: {
-        candidate = inc.add(new BigDecimal(last.asInt2()));
-        return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt2()));
+          return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt2()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+        }
       }
+      case DATE:
       case INT4: {
-        candidate = inc.add(new BigDecimal(last.asInt4()));
-        return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt4()));
+          return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+        }
       }
+      case TIME:
+      case TIMESTAMP:
       case INT8: {
-        candidate = inc.add(new BigDecimal(last.asInt8()));
-        return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt8()));
+          return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+        }
       }
       case FLOAT4: {
-        candidate = inc.add(new BigDecimal(last.asFloat4()));
-        return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat4()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+        }
       }
       case FLOAT8: {
-        candidate = inc.add(new BigDecimal(last.asFloat8()));
-        return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat8()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+        }
+
       }
       case TEXT: {
-        candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
-        return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
+          return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+        }
       }
     }
     return overflow;
   }
 
   public long incrementAndGetReminder(int colId, Datum last, long inc) {
-    Column column = schema.getColumn(colId);
+    Column column = sortSpecs[colId].getSortKey();
     long reminder = 0;
     switch (column.getDataType().getType()) {
       case BIT: {
@@ -166,12 +221,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         reminder = candidate - end;
         break;
       }
+      case DATE:
       case INT4: {
         int candidate = (int) (last.asInt4() + inc);
         int end = range.getEnd().get(colId).asInt4();
         reminder = candidate - end;
         break;
       }
+      case TIME:
+      case TIMESTAMP:
       case INT8: {
         long candidate = last.asInt8() + inc;
         long end = range.getEnd().get(colId).asInt8();
@@ -231,7 +289,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     int finalId = baseDigit;
     incs[finalId] = value;
     for (int i = finalId; i >= 0; i--) {
-      if (isOverflow(i, last.get(i), incs[i])) {
+      if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
         if (i == 0) {
           throw new RangeOverflowException(range, last, incs[i].longValue());
         }
@@ -253,10 +311,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       }
     }
 
-    Tuple end = new VTuple(schema.getColumnNum());
+    Tuple end = new VTuple(sortSpecs.length);
     Column column;
     for (int i = 0; i < last.size(); i++) {
-      column = schema.getColumn(i);
+      column = sortSpecs[i].getSortKey();
       switch (column.getDataType().getType()) {
         case CHAR:
           if (overflowFlag[i]) {
@@ -286,13 +344,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createInt4(
                 (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+            }
           }
           break;
         case INT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt8(
-                range.getStart().get(i).asInt4() + incs[i].longValue()));
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
           }
@@ -322,6 +384,28 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
                 ((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
           }
           break;
+        case DATE:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+          }
+          break;
+        case TIME:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case TIMESTAMP:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTimeStampFromMillis(
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTimeStampFromMillis(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
         default:
           throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 2aa93d7..d040d7e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -469,6 +469,7 @@ public class GlobalPlanner {
         currentNode.setChild(secondScan);
         currentNode.setInSchema(secondScan.getOutSchema());
         currentBlock.setPlan(currentNode);
+        currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
       }
     } else {
       LogicalNode childBlockPlan = childBlock.getPlan();
@@ -487,6 +488,7 @@ public class GlobalPlanner {
       currentNode.setChild(secondScan);
       currentNode.setInSchema(secondScan.getOutSchema());
       currentBlock.setPlan(currentNode);
+      currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
       masterPlan.addConnect(channel);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index a3b37fc..f0ac290 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -27,11 +27,14 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -76,8 +79,10 @@ public class ExternalSortExec extends SortExec {
   private final LocalDirAllocator localDirAllocator;
   /** local file system */
   private final RawLocalFileSystem localFS;
-  /** final output files */
+  /** final output files which are used for cleaning */
   private List<Path> finalOutputFiles = null;
+  /** for directly merging sorted inputs */
+  private List<Path> mergedInputPaths = null;
 
   ///////////////////////////////////////////////////
   // transient variables
@@ -89,10 +94,10 @@ public class ExternalSortExec extends SortExec {
   /** the final result */
   private Scanner result;
 
-  public ExternalSortExec(final TaskAttemptContext context,
-                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
-      throws IOException {
-    super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
+  private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+      throws PhysicalPlanningException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
+
     this.plan = plan;
     this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
 
@@ -111,6 +116,25 @@ public class ExternalSortExec extends SortExec {
     localFS = new RawLocalFileSystem();
   }
 
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan,
+                          final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
+    this(context, sm, plan);
+
+    mergedInputPaths = TUtil.newList();
+    for (CatalogProtos.FragmentProto proto : fragments) {
+      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      mergedInputPaths.add(fragment.getPath());
+    }
+  }
+
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+      throws IOException {
+    this(context, sm, plan);
+    setChild(child);
+  }
+
   public void init() throws IOException {
     super.init();
   }
@@ -120,9 +144,9 @@ public class ExternalSortExec extends SortExec {
   }
 
   /**
-   * Sort tuple block and store them into a chunk file
+   * Sort a tuple block and store them into a chunk file
    */
-  private final Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+  private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
       throws IOException {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
     int rowNum = tupleBlock.size();
@@ -156,7 +180,7 @@ public class ExternalSortExec extends SortExec {
    * @return All paths of chunks
    * @throws java.io.IOException
    */
-  private final List<Path> sortAndStoreAllChunks() throws IOException {
+  private List<Path> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
     int memoryConsumption = 0;
     List<Path> chunkPaths = TUtil.newList();
@@ -212,22 +236,31 @@ public class ExternalSortExec extends SortExec {
 
     if (!sorted) { // if not sorted, first sort all data
 
-      // Try to sort all data, and store them into a number of chunks if memory exceeds
-      long startTimeOfChunkSplit = System.currentTimeMillis();
-      List<Path> chunks = sortAndStoreAllChunks();
-      long endTimeOfChunkSplit = System.currentTimeMillis();
-      info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
-
-      if (memoryResident) { // if all sorted data reside in a main-memory table.
-        this.result = new MemTableScanner();
-      } else { // if input data exceeds main-memory at least once
-
+      // if input files are given, it starts merging directly.
+      if (mergedInputPaths != null) {
         try {
-          this.result = externalMergeAndSort(chunks);
-        } catch (Exception exception) {
-          throw new PhysicalPlanningException(exception);
+          this.result = externalMergeAndSort(mergedInputPaths);
+        } catch (Exception e) {
+          throw new PhysicalPlanningException(e);
         }
+      } else {
+        // Try to sort all data, and store them as multiple chunks if memory exceeds
+        long startTimeOfChunkSplit = System.currentTimeMillis();
+        List<Path> chunks = sortAndStoreAllChunks();
+        long endTimeOfChunkSplit = System.currentTimeMillis();
+        info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+        if (memoryResident) { // if all sorted data reside in a main-memory table.
+          this.result = new MemTableScanner();
+        } else { // if input data exceeds main-memory at least once
+
+          try {
+            this.result = externalMergeAndSort(chunks);
+          } catch (Exception e) {
+            throw new PhysicalPlanningException(e);
+          }
 
+        }
       }
 
       sorted = true;
@@ -511,7 +544,7 @@ public class ExternalSortExec extends SortExec {
           outTuple = rightTuple;
           rightTuple = rightScan.next();
         }
-        return outTuple;
+        return new VTuple(outTuple);
       }
 
       if (leftTuple == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index a8dd877..f31455f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -44,14 +44,20 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
   }
 
   public void init() throws IOException {
-    child.init();
+    if (child != null) {
+      child.init();
+    }
   }
 
   public void rescan() throws IOException {
-    child.rescan();
+    if (child != null) {
+      child.rescan();
+    }
   }
 
   public void close() throws IOException {
-    child.close();
+    if (child != null) {
+      child.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 42508a0..b96b65e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -22,26 +22,20 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.Collection;
 import java.util.Iterator;
@@ -49,279 +43,8 @@ import java.util.List;
 import java.util.Map;
 
 public class TupleUtil {
-  /** class logger **/
-  private static final Log LOG = LogFactory.getLog(TupleUtil.class);
 
-  /**
-   * It computes the value cardinality of a tuple range.
-   *
-   * @param schema
-   * @param range
-   * @return
-   */
-  public static long computeCardinality(Schema schema, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-
-    long cardinality = 1;
-    long columnCard;
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          columnCard = end.get(i).asChar() - start.get(i).asChar();
-          break;
-        case BIT:
-          columnCard = end.get(i).asByte() - start.get(i).asByte();
-          break;
-        case INT2:
-          columnCard = end.get(i).asInt2() - start.get(i).asInt2();
-          break;
-        case INT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case INT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case FLOAT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case FLOAT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case TEXT:
-          columnCard = end.get(i).asChars().charAt(0) - start.get(i).asChars().charAt(0);
-          break;
-        default:
-          throw new UnsupportedOperationException(col.getDataType() + " is not supported yet");
-      }
-
-      if (columnCard > 0) {
-        cardinality *= columnCard + 1;
-      }
-    }
-
-    return cardinality;
-  }
-
-  public static TupleRange [] getPartitions(Schema schema, int partNum, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-    TupleRange [] partitioned = new TupleRange[partNum];
-
-    Datum[] term = new Datum[schema.getColumnNum()];
-    Datum[] prevValues = new Datum[schema.getColumnNum()];
-
-    // initialize term and previous values
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      prevValues[i] = start.get(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          int sChar = start.get(i).asChar();
-          int eChar = end.get(i).asChar();
-          int rangeChar;
-          if ((eChar - sChar) > partNum) {
-            rangeChar = (eChar - sChar) / partNum;
-          } else {
-            rangeChar = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeChar);
-        case BIT:
-          byte sByte = start.get(i).asByte();
-          byte eByte = end.get(i).asByte();
-          int rangeByte;
-          if ((eByte - sByte) > partNum) {
-            rangeByte = (eByte - sByte) / partNum;
-          } else {
-            rangeByte = 1;
-          }
-          term[i] = DatumFactory.createBit((byte) rangeByte);
-          break;
-
-        case INT2:
-          short sShort = start.get(i).asInt2();
-          short eShort = end.get(i).asInt2();
-          int rangeShort;
-          if ((eShort - sShort) > partNum) {
-            rangeShort = (eShort - sShort) / partNum;
-          } else {
-            rangeShort = 1;
-          }
-          term[i] = DatumFactory.createInt2((short) rangeShort);
-          break;
-
-        case INT4:
-          int sInt = start.get(i).asInt4();
-          int eInt = end.get(i).asInt4();
-          int rangeInt;
-          if ((eInt - sInt) > partNum) {
-            rangeInt = (eInt - sInt) / partNum;
-          } else {
-            rangeInt = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeInt);
-          break;
-
-        case INT8:
-          long sLong = start.get(i).asInt8();
-          long eLong = end.get(i).asInt8();
-          long rangeLong;
-          if ((eLong - sLong) > partNum) {
-            rangeLong = ((eLong - sLong) / partNum);
-          } else {
-            rangeLong = 1;
-          }
-          term[i] = DatumFactory.createInt8(rangeLong);
-          break;
-
-        case FLOAT4:
-          float sFloat = start.get(i).asFloat4();
-          float eFloat = end.get(i).asFloat4();
-          float rangeFloat;
-          if ((eFloat - sFloat) > partNum) {
-            rangeFloat = ((eFloat - sFloat) / partNum);
-          } else {
-            rangeFloat = 1;
-          }
-          term[i] = DatumFactory.createFloat4(rangeFloat);
-          break;
-        case FLOAT8:
-          double sDouble = start.get(i).asFloat8();
-          double eDouble = end.get(i).asFloat8();
-          double rangeDouble;
-          if ((eDouble - sDouble) > partNum) {
-            rangeDouble = ((eDouble - sDouble) / partNum);
-          } else {
-            rangeDouble = 1;
-          }
-          term[i] = DatumFactory.createFloat8(rangeDouble);
-          break;
-        case TEXT:
-          char sChars = start.get(i).asChars().charAt(0);
-          char eChars = end.get(i).asChars().charAt(0);
-          int rangeString;
-          if ((eChars - sChars) > partNum) {
-            rangeString = ((eChars - sChars) / partNum);
-          } else {
-            rangeString = 1;
-          }
-          term[i] = DatumFactory.createText(((char) rangeString) + "");
-          break;
-        case INET4:
-          throw new UnsupportedOperationException();
-        case BLOB:
-          throw new UnsupportedOperationException();
-        default:
-          throw new UnsupportedOperationException();
-      }
-    }
-
-    for (int p = 0; p < partNum; p++) {
-      Tuple sTuple = new VTuple(schema.getColumnNum());
-      Tuple eTuple = new VTuple(schema.getColumnNum());
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        col = schema.getColumn(i);
-        sTuple.put(i, prevValues[i]);
-        switch (col.getDataType().getType()) {
-          case CHAR:
-            char endChar = (char) (prevValues[i].asChar() + term[i].asChar());
-            if (endChar > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createChar(endChar));
-            }
-            prevValues[i] = DatumFactory.createChar(endChar);
-            break;
-          case BIT:
-            byte endByte = (byte) (prevValues[i].asByte() + term[i].asByte());
-            if (endByte > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createBit(endByte));
-            }
-            prevValues[i] = DatumFactory.createBit(endByte);
-            break;
-          case INT2:
-            int endShort = (short) (prevValues[i].asInt2() + term[i].asInt2());
-            if (endShort > end.get(i).asInt2()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt2((short) endShort));
-            }
-            prevValues[i] = DatumFactory.createInt2((short) endShort);
-            break;
-          case INT4:
-            int endInt = (prevValues[i].asInt4() + term[i].asInt4());
-            if (endInt > end.get(i).asInt4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt4(endInt));
-            }
-            prevValues[i] = DatumFactory.createInt4(endInt);
-            break;
-
-          case INT8:
-            long endLong = (prevValues[i].asInt8() + term[i].asInt8());
-            if (endLong > end.get(i).asInt8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt8(endLong));
-            }
-            prevValues[i] = DatumFactory.createInt8(endLong);
-            break;
-
-          case FLOAT4:
-            float endFloat = (prevValues[i].asFloat4() + term[i].asFloat4());
-            if (endFloat > end.get(i).asFloat4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat4(endFloat));
-            }
-            prevValues[i] = DatumFactory.createFloat4(endFloat);
-            break;
-          case FLOAT8:
-            double endDouble = (prevValues[i].asFloat8() + term[i].asFloat8());
-            if (endDouble > end.get(i).asFloat8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat8(endDouble));
-            }
-            prevValues[i] = DatumFactory.createFloat8(endDouble);
-            break;
-          case TEXT:
-            String endString = ((char)(prevValues[i].asChars().charAt(0) + term[i].asChars().charAt(0))) + "";
-            if (endString.charAt(0) > end.get(i).asChars().charAt(0)) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createText(endString));
-            }
-            prevValues[i] = DatumFactory.createText(endString);
-            break;
-          case INET4:
-            throw new UnsupportedOperationException();
-          case BLOB:
-            throw new UnsupportedOperationException();
-          default:
-            throw new UnsupportedOperationException();
-        }
-      }
-      partitioned[p] = new TupleRange(schema, sTuple, eTuple);
-    }
-
-    return partitioned;
-  }
-
-  public static String rangeToQuery(Schema schema, TupleRange range,
-                                    boolean ascendingFirstKey, boolean last)
+  public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
       throws UnsupportedEncodingException {
     StringBuilder sb = new StringBuilder();
     byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder
@@ -345,32 +68,8 @@ public class TupleUtil {
     return sb.toString();
   }
 
-  public static String [] rangesToQueries(final SortSpec[] sortSpec,
-                                          final TupleRange[] ranges)
-      throws UnsupportedEncodingException {
-    Schema schema = PlannerUtil.sortSpecsToSchema(sortSpec);
-    boolean ascendingFirstKey = sortSpec[0].isAscending();
-    String [] params = new String[ranges.length];
-    for (int i = 0; i < ranges.length; i++) {
-      params[i] =
-        rangeToQuery(schema, ranges[i], ascendingFirstKey,
-            ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
-    }
-    return params;
-  }
-
-  public static TupleRange queryToRange(Schema schema, String query) throws UnsupportedEncodingException {
-    Map<String,String> params = HttpUtil.getParamsFromQuery(query);
-    String startUrlDecoded = URLDecoder.decode(params.get("start"), "utf-8");
-    String endUrlDecoded = URLDecoder.decode(params.get("end"), "utf-8");
-    byte [] startBytes = Base64.decodeBase64(startUrlDecoded);
-    byte [] endBytes = Base64.decodeBase64(endUrlDecoded);
-    return new TupleRange(schema, RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, startBytes), RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, endBytes));
-  }
+  public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) {
 
-  public static TupleRange columnStatToRange(Schema schema, Schema target, List<ColumnStats> colStats) {
     Map<Column, ColumnStats> statSet = Maps.newHashMap();
     for (ColumnStats stat : colStats) {
       statSet.put(stat.getColumn(), stat);
@@ -385,11 +84,16 @@ public class TupleUtil {
     Tuple endTuple = new VTuple(target.getColumnNum());
     int i = 0;
     for (Column col : target.getColumns()) {
-      startTuple.put(i, statSet.get(col).getMinValue());
-      endTuple.put(i, statSet.get(col).getMaxValue());
+      if (sortSpecs[i].isAscending()) {
+        startTuple.put(i, statSet.get(col).getMinValue());
+        endTuple.put(i, statSet.get(col).getMaxValue());
+      } else {
+        startTuple.put(i, statSet.get(col).getMaxValue());
+        endTuple.put(i, statSet.get(col).getMinValue());
+      }
       i++;
     }
-    return new TupleRange(target, startTuple, endTuple);
+    return new TupleRange(sortSpecs, startTuple, endTuple);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 71ab8f9..8ea824e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -308,8 +308,8 @@ public class Repartitioner {
 
     // calculate the number of maximum query ranges
     TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
-    TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, totalStat.getColumnStats());
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(sortSchema, mergedRange);
+    TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+    RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
     BigDecimal card = partitioner.getTotalCardinality();
 
     // if the number of the range cardinality is less than the desired number of tasks,
@@ -355,8 +355,8 @@ public class Repartitioner {
       for (int i = 0; i < ranges.length; i++) {
         uris = new HashSet<URI>();
         for (String uri: basicFetchURIs) {
-          String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
-              ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+          String rangeParam =
+              TupleUtil.rangeToQuery(sortSchema, ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
           URI finalUri = URI.create(uri + "&" + rangeParam);
           uris.add(finalUri);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index 26991a0..a54fa80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -29,7 +29,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.worker.dataserver.retriever.FileChunk;
 import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -113,7 +112,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
       startOffset = idxReader.find(start);
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
     }
@@ -124,7 +123,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
       }
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
     }
@@ -136,7 +135,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
         startOffset = idxReader.find(start, true);
       } catch (IOException ioe) {
         LOG.error("State Dump (the requested range: "
-            + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
             + idxReader.getLastKey());
         throw ioe;
       }
@@ -145,7 +144,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
     if (startOffset == -1) {
       throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
           "State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 1b72d13..8a0b63a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -166,7 +166,7 @@ public class Task {
       this.shuffleType = context.getDataChannel().getShuffleType();
 
       if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-        SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
+        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index acd663f..3d5cdf2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner;
 
+import org.apache.tajo.catalog.SortSpec;
 import org.junit.Test;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -39,6 +40,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_returnflag", Type.TEXT)
     .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -46,11 +50,10 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createText("D"));
     e.put(1, DatumFactory.createText("C"));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
 
     String [] result = new String[12];
     result[0] = "AA";
@@ -84,6 +87,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_returnflag", Type.TEXT)
     .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -91,11 +97,10 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createText("D"));
     e.put(1, DatumFactory.createText("C"));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
 
     String [] result = new String[12];
     result[0] = "AA";
@@ -129,6 +134,8 @@ public class TestUniformRangePartition {
     .addColumn("l_linestatus", Type.TEXT)
     .addColumn("final", Type.TEXT);
 
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -138,11 +145,10 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createText("B")); //  2
     e.put(2, DatumFactory.createText("C")); // x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(24, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().intValue());
 
     Tuple overflowBefore = partitioner.increment(s, 5, 2);
     assertEquals("A", overflowBefore.get(0).asChars());
@@ -159,6 +165,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_orderkey", Type.INT8)
     .addColumn("l_linenumber", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createInt8(10));
     s.put(1, DatumFactory.createInt8(20));
@@ -166,10 +175,9 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createInt8(19));
     e.put(1, DatumFactory.createInt8(39));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(200, partitioner.getTotalCardinality().longValue());
 
     Tuple range2 = partitioner.increment(s, 100, 1);
@@ -185,6 +193,9 @@ public class TestUniformRangePartition {
     .addColumn("l_orderkey", Type.INT8)
     .addColumn("l_linenumber", Type.INT8)
     .addColumn("final", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createInt8(1));
     s.put(1, DatumFactory.createInt8(1));
@@ -194,10 +205,9 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createInt8(2)); // 2
     e.put(2, DatumFactory.createInt8(3)); //x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner
-        = new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
     Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -216,6 +226,9 @@ public class TestUniformRangePartition {
       .addColumn("l_orderkey", Type.FLOAT8)
       .addColumn("l_linenumber", Type.FLOAT8)
       .addColumn("final", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createFloat8(1.1d));
     s.put(1, DatumFactory.createFloat8(1.1d));
@@ -225,10 +238,9 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createFloat8(2.1d)); // 2
     e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
     Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -246,15 +258,18 @@ public class TestUniformRangePartition {
     Schema schema = new Schema();
     schema.addColumn("l_returnflag", Type.TEXT);
     schema.addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("F"));
     Tuple e = new VTuple(2);
     e.put(0, DatumFactory.createText("R"));
     e.put(1, DatumFactory.createText("O"));
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
     RangePartitionAlgorithm partitioner
-        = new UniformRangePartition(schema, expected, true);
+        = new UniformRangePartition(expected, sortSpecs, true);
     TupleRange [] ranges = partitioner.partition(31);
 
 
@@ -273,15 +288,18 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
       .addColumn("l_returnflag", Type.TEXT)
       .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("F"));
     Tuple e = new VTuple(2);
     e.put(0, DatumFactory.createText("R"));
     e.put(1, DatumFactory.createText("O"));
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
     RangePartitionAlgorithm partitioner =
-        new UniformRangePartition(schema, expected, true);
+        new UniformRangePartition(expected, sortSpecs, true);
     TupleRange [] ranges = partitioner.partition(1);
 
     assertEquals(expected, ranges[0]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index cf24054..97932e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -51,6 +51,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.Stack;
 
 import static org.junit.Assert.assertEquals;
 
@@ -191,7 +192,7 @@ public class TestBSTIndexExec {
     }
 
     @Override
-    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
         throws IOException {
       Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
           "Error: There is no table matched to %s", scanNode.getTableName());