You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/10 17:48:26 UTC

[1/2] TAJO-584: Improve distributed merge sort.

Updated Branches:
  refs/heads/master 4179a7c92 -> 214b9741a


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index d7a3eb1..a2a2b54 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -35,7 +33,9 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,7 +74,6 @@ public class TestExternalSortExec {
     schema.addColumn("managerId", Type.INT4);
     schema.addColumn("empId", Type.INT4);
     schema.addColumn("deptName", Type.TEXT);
-    schema.addColumn("text_field", Type.TEXT);
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
@@ -87,7 +86,6 @@ public class TestExternalSortExec {
           DatumFactory.createInt4(rnd.nextInt(50)),
           DatumFactory.createInt4(rnd.nextInt(100)),
           DatumFactory.createText("dept_" + i),
-          DatumFactory.createText("f_" + i)
       });
       appender.addTuple(tuple);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 4ac423b..e163a29 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -293,6 +293,7 @@ public class TestLeftOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -333,6 +334,7 @@ public class TestLeftOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -374,6 +376,7 @@ public class TestLeftOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[3]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -415,6 +418,7 @@ public class TestLeftOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[4]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index ce31851..0540bc1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -19,9 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -34,20 +33,21 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class TestLeftOuterNLJoinExec {
   private TajoConf conf;
@@ -251,6 +251,7 @@ public class TestLeftOuterNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -292,6 +293,7 @@ public class TestLeftOuterNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -302,7 +304,7 @@ public class TestLeftOuterNLJoinExec {
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
     if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      HashLeftOuterJoinExec join = proj.getChild();
       NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
@@ -335,6 +337,7 @@ public class TestLeftOuterNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -345,7 +348,7 @@ public class TestLeftOuterNLJoinExec {
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
     if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      HashLeftOuterJoinExec join = proj.getChild();
       NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
@@ -379,6 +382,7 @@ public class TestLeftOuterNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[3]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -389,7 +393,7 @@ public class TestLeftOuterNLJoinExec {
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
     if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      HashLeftOuterJoinExec join = proj.getChild();
       NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;
@@ -422,6 +426,7 @@ public class TestLeftOuterNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[4]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -432,7 +437,7 @@ public class TestLeftOuterNLJoinExec {
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
     ProjectionExec proj = (ProjectionExec) exec;
     if (proj.getChild() instanceof HashLeftOuterJoinExec) {
-      HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
+      HashLeftOuterJoinExec join = proj.getChild();
       NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
       proj.setChild(aJoin);
       exec = proj;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index c97ed00..72462de 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -189,6 +189,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -218,6 +219,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[16]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -703,6 +705,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr  context = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index d582e2b..b52a37a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -46,7 +47,6 @@ import org.apache.tajo.LocalTajoTestingUtility;
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 // this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
 public class TestRightOuterHashJoinExec {
@@ -229,6 +229,7 @@ public class TestRightOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -269,6 +270,7 @@ public class TestRightOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -309,6 +311,7 @@ public class TestRightOuterHashJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 4aa1ba2..7e8c118 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -140,13 +140,15 @@ public class TestSortExec {
   public void testTAJO_946() {
     Schema schema = new Schema();
     schema.addColumn("l_orderkey", Type.INT8);
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(1);
     s.put(0, DatumFactory.createInt8(0));
     Tuple e = new VTuple(1);
     e.put(0, DatumFactory.createInt8(6000000000l));
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
     RangePartitionAlgorithm partitioner
-        = new UniformRangePartition(schema, expected, true);
+        = new UniformRangePartition(expected, sortSpecs, true);
     TupleRange [] ranges = partitioner.partition(967);
 
     TupleRange prev = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 5e5c70c..b6e439a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -103,4 +103,14 @@ public class TestSortQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testSortWithAscDescKeys() throws Exception {
+    executeDDL("create_table_with_asc_desc_keys.sql", "table2.tbl");
+
+    ResultSet res = executeQuery();
+    System.out.println(resultSetToString(res));
+    cleanupQuery(res);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 3c4d3c2..c6ec236 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.engine.util;
 
 import org.apache.hadoop.fs.Path;
-import org.junit.Test;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
@@ -29,10 +29,7 @@ import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
 import org.apache.tajo.engine.planner.UniformRangePartition;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.worker.dataserver.HttpUtil;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
+import org.junit.Test;
 
 import static org.junit.Assert.*;
 
@@ -80,6 +77,7 @@ public class TestTupleUtil {
     Tuple eTuple = new VTuple(7);
 
     Schema schema = new Schema();
+
     schema.addColumn("numByte", Type.BIT);
     schema.addColumn("numChar", Type.CHAR);
     schema.addColumn("numShort", Type.INT2);
@@ -88,6 +86,8 @@ public class TestTupleUtil {
     schema.addColumn("numFloat", Type.FLOAT4);
     schema.addColumn("numDouble", Type.FLOAT4);
 
+    SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     sTuple.put(0, DatumFactory.createBit((byte) 44));
     sTuple.put(1, DatumFactory.createChar('a'));
     sTuple.put(2, DatumFactory.createInt2((short) 10));
@@ -104,7 +104,8 @@ public class TestTupleUtil {
     eTuple.put(5, DatumFactory.createFloat4(150));
     eTuple.put(6, DatumFactory.createFloat8(170));
 
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(schema, new TupleRange(schema, sTuple, eTuple));
+    RangePartitionAlgorithm partitioner = new UniformRangePartition(new TupleRange(sortSpecs, sTuple, eTuple),
+        sortSpecs);
     TupleRange [] ranges = partitioner.partition(5);
     assertTrue(5 <= ranges.length);
     TupleComparator comp = new TupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
@@ -117,119 +118,6 @@ public class TestTupleUtil {
   }
 
   @Test
-  public void testQueryToRange() throws UnsupportedEncodingException {
-    Schema schema = new Schema();
-    schema.addColumn("intval", Type.INT4);
-    schema.addColumn("floatval", Type.FLOAT4);
-
-    Tuple s = new VTuple(2);
-    s.put(0, DatumFactory.createInt4(5));
-    s.put(1, DatumFactory.createFloat4(10));
-
-    Tuple e = new VTuple(2);
-    e.put(0, DatumFactory.createInt4(10));
-    e.put(1, DatumFactory.createFloat4(20));
-
-    TupleRange expected = new TupleRange(schema, s, e);
-    int card = (int) TupleUtil.computeCardinality(schema, expected);
-    assertEquals(66, card);
-    int partNum = TupleUtil.getPartitions(schema, 5, expected).length;
-    assertEquals(5, partNum);
-
-    // TODO - partition should be improved to consider all fields
-    //partNum = TupleUtil.partition(schema, 10, expected).length;
-    //assertEquals(10, partNum);
-
-    String query = TupleUtil.rangeToQuery(schema, expected, true, false);
-
-    TupleRange range = TupleUtil.queryToRange(schema, query);
-    assertEquals(expected, range);
-
-    query = TupleUtil.rangeToQuery(schema, expected, true, true);
-    Map<String,String> params = HttpUtil.getParamsFromQuery(query);
-    assertTrue(params.containsKey("final"));
-    range = TupleUtil.queryToRange(schema, query);
-    assertEquals(expected, range);
-  }
-
-  @Test
-  public void testQueryToRangeWithOneRange() throws UnsupportedEncodingException {
-    Schema schema = new Schema();
-    schema.addColumn("partkey", Type.FLOAT4);
-
-    Tuple s = new VTuple(1);
-    s.put(0, DatumFactory.createFloat4(28082));
-    Tuple e = new VTuple(1);
-    e.put(0, DatumFactory.createFloat4(28082));
-
-    TupleRange expected = new TupleRange(schema, s, e);
-    int card = (int) TupleUtil.computeCardinality(schema, expected);
-    assertEquals(1, card);
-    int partNum = TupleUtil.getPartitions(schema, card, expected).length;
-    assertEquals(1, partNum);
-
-    String query = TupleUtil.rangeToQuery(schema, expected, true, false);
-
-    TupleRange range = TupleUtil.queryToRange(schema, query);
-    assertEquals(expected, range);
-
-    query = TupleUtil.rangeToQuery(schema, expected, true, true);
-    Map<String,String> params = HttpUtil.getParamsFromQuery(query);
-    assertTrue(params.containsKey("final"));
-    range = TupleUtil.queryToRange(schema, query);
-    assertEquals(expected, range);
-  }
-
-  @Test
-  /**
-   * It verifies NTA-805.
-   */
-  public void testRangeToQueryHeavyTest() throws UnsupportedEncodingException {
-    Schema schema = new Schema();
-    schema.addColumn("c_custkey", Type.INT4);
-    Tuple s = new VTuple(1);
-    s.put(0, DatumFactory.createInt4(4));
-    Tuple e = new VTuple(1);
-    e.put(0, DatumFactory.createInt4(149995));
-    TupleRange expected = new TupleRange(schema, s, e);
-    TupleRange [] ranges = TupleUtil.getPartitions(schema, 31, expected);
-
-    String query;
-    for (TupleRange range : ranges) {
-      query = TupleUtil.rangeToQuery(schema, range, true, false);
-      TupleRange result = TupleUtil.queryToRange(schema, query);
-      assertEquals(range, result);
-    }
-  }
-
-  @Test
-  /**
-   * It verifies NTA-807
-   */
-  public void testRangeToQueryTest() throws UnsupportedEncodingException {
-    Schema schema = new Schema();
-    schema.addColumn("l_returnflag", Type.TEXT);
-    schema.addColumn("l_linestatus", Type.TEXT);
-    Tuple s = new VTuple(2);
-    s.put(0, DatumFactory.createText("A"));
-    s.put(1, DatumFactory.createText("F"));
-    Tuple e = new VTuple(2);
-    e.put(0, DatumFactory.createText("R"));
-    e.put(1, DatumFactory.createText("O"));
-    TupleRange expected = new TupleRange(schema, s, e);
-
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(schema, expected, true);
-    TupleRange [] ranges = partitioner.partition(31);
-
-    String query;
-    for (TupleRange range : ranges) {
-      query = TupleUtil.rangeToQuery(schema, range, true, false);
-      TupleRange result = TupleUtil.queryToRange(schema, query);
-      assertEquals(range, result);
-    }
-  }
-
-  @Test
   public void testBuildTupleFromPartitionPath() {
 
     Schema schema = new Schema();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 78e846d..b18f706 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -34,6 +34,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.storage.*;
@@ -131,7 +132,7 @@ public class TestRangeRetrieverHandler {
 
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
         new FileFragment[] {frags[0]}, testDir);
-
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -189,8 +190,8 @@ public class TestRangeRetrieverHandler {
       //assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(1).asChars()));
     }
 
-    TupleRange totalRange = new TupleRange(keySchema, firstTuple, lastTuple);
-    UniformRangePartition partitioner = new UniformRangePartition(keySchema, totalRange, true);
+    TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
+    UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
     TupleRange [] partitions = partitioner.partition(7);
 
     // The below is for testing RangeRetrieverHandler.
@@ -253,6 +254,7 @@ public class TestRangeRetrieverHandler {
     TaskAttemptContext
         ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
         new FileFragment[] {frags[0]}, testDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(SORT_QUERY[1]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -307,8 +309,8 @@ public class TestRangeRetrieverHandler {
       assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4());
     }
 
-    TupleRange totalRange = new TupleRange(keySchema, lastTuple, firstTuple);
-    UniformRangePartition partitioner = new UniformRangePartition(keySchema, totalRange, true);
+    TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
+    UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
     TupleRange [] partitions = partitioner.partition(25);
 
     File dataFile = new File((new Path(testDir, "output")).toUri());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/resources/dataset/TestSortQuery/table2.tbl
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestSortQuery/table2.tbl b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestSortQuery/table2.tbl
new file mode 100644
index 0000000..bc237af
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestSortQuery/table2.tbl
@@ -0,0 +1,24 @@
+1,2132
+1,15635
+1,24027
+1,63700
+1,67310
+1,155190
+2,106170
+3,4297
+3,19036
+3,29380
+3,62143
+3,128449
+3,183095
+4,88035
+5,37531
+5,108570
+5,123927
+6,139636
+7,79251
+7,94780
+7,145243
+7,151894
+7,157238
+7,163073

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/resources/org/apache/tajo/jdbc/TestTajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/org/apache/tajo/jdbc/TestTajoResultSet.java b/tajo-core/tajo-core-backend/src/test/resources/org/apache/tajo/jdbc/TestTajoResultSet.java
new file mode 100644
index 0000000..92a8a6a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/org/apache/tajo/jdbc/TestTajoResultSet.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTajoResultSet {
+  @Test
+  public final void testFileNameComparator() {
+
+    Path[] paths = new Path [] {
+        new Path("hdfs://xtajox.com:9010/tmp/tajo-hadoop/staging/q_1391511584109_0001/RESULT/part-02-000104"),
+        new Path("hdfs://xtajox.com:9010/tmp/tajo-hadoop/staging/q_1391511584109_0001/RESULT/part-02-000000"),
+        new Path("hdfs://xtajox.com:9010/tmp/tajo-hadoop/staging/q_1391511584109_0001/RESULT/part-02-000105"),
+        new Path("hdfs://xtajox.com:9010/tmp/tajo-hadoop/staging/q_1391511584109_0001/RESULT/part-02-000001")
+    };
+
+    FileStatus [] fileStatuses = new FileStatus[paths.length];
+
+    for (int i = 0; i < paths.length; i++) {
+      fileStatuses[i] = mock(FileStatus.class);
+      when(fileStatuses[i].getPath()).thenReturn(paths[i]);
+    }
+
+    TajoResultSet.FileNameComparator comparator = new TajoResultSet.FileNameComparator();
+    Arrays.sort(fileStatuses, comparator);
+
+    FileStatus prev = null;
+    for (int i = 0; i < fileStatuses.length; i++) {
+      if (prev == null) {
+        prev = fileStatuses[i];
+      } else {
+        assertTrue(comparator.compare(prev, fileStatuses[i]) <= 0);
+      }
+
+      assertTrue(prev.getPath().getName().compareTo(fileStatuses[i].getPath().getName()) <= 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testOuterJoinAndCaseWhen1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testOuterJoinAndCaseWhen1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testOuterJoinAndCaseWhen1.sql
index ccb58a5..a7ebef4 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testOuterJoinAndCaseWhen1.sql
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinQuery/testOuterJoinAndCaseWhen1.sql
@@ -6,4 +6,7 @@ select
   case when b.name is null then '9991231' else b.name end as c1,
   case when c.name is null then '9991231' else c.name end as c2
 from
-  table1 a left outer join table2 b on a.id = b.id left outer join table1 c on b.id = c.id;
\ No newline at end of file
+  table1 a left outer join table2 b on a.id = b.id left outer join table1 c on b.id = c.id
+order by
+  a.id,
+  a.name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/create_table_with_asc_desc_keys.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/create_table_with_asc_desc_keys.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/create_table_with_asc_desc_keys.sql
new file mode 100644
index 0000000..3ffa2b3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/create_table_with_asc_desc_keys.sql
@@ -0,0 +1 @@
+create external table table2 (col1 int8, col2 int8) using csv with ('csvfile.delimiter'=',') location ${table.path};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/testSortWithAscDescKeys.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/testSortWithAscDescKeys.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/testSortWithAscDescKeys.sql
new file mode 100644
index 0000000..2eb9d1c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestSortQuery/testSortWithAscDescKeys.sql
@@ -0,0 +1 @@
+select col1, col2 from table2 order by col1, col2 desc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index 1531fad..8054a40 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -51,7 +51,6 @@ import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.TajoIdUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -608,7 +607,7 @@ public class PullServerAuxService extends AuxiliaryService {
       startOffset = idxReader.find(start);
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: "
+          + "[" + start + ", " + end +")" + ", idx min: "
           + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
@@ -620,7 +619,7 @@ public class PullServerAuxService extends AuxiliaryService {
       }
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: "
+          + "[" + start + ", " + end +")" + ", idx min: "
           + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
@@ -633,7 +632,7 @@ public class PullServerAuxService extends AuxiliaryService {
         startOffset = idxReader.find(start, true);
       } catch (IOException ioe) {
         LOG.error("State Dump (the requested range: "
-            + new TupleRange(keySchema, start, end) + ", idx min: "
+            + "[" + start + ", " + end +")" + ", idx min: "
             + idxReader.getFirstKey() + ", idx max: "
             + idxReader.getLastKey());
         throw ioe;
@@ -643,7 +642,7 @@ public class PullServerAuxService extends AuxiliaryService {
     if (startOffset == -1) {
       throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
           "State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 1b0cadc..da34ac7 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -48,7 +48,6 @@ import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -587,13 +586,6 @@ public class TajoPullServerService extends AbstractService {
           + ", decoded byte size: " + endBytes.length, t);
     }
 
-
-    if(!comparator.isAscendingFirstKey()) {
-      Tuple tmpKey = start;
-      start = end;
-      end = tmpKey;
-    }
-
     LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
         (last ? ", last=true" : "") + ")");
 
@@ -615,7 +607,7 @@ public class TajoPullServerService extends AbstractService {
       startOffset = idxReader.find(start);
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: "
+          + "[" + start + ", " + end +")" + ", idx min: "
           + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
@@ -627,7 +619,7 @@ public class TajoPullServerService extends AbstractService {
       }
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: "
+          + "[" + start + ", " + end +")" + ", idx min: "
           + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
@@ -640,7 +632,7 @@ public class TajoPullServerService extends AbstractService {
         startOffset = idxReader.find(start, true);
       } catch (IOException ioe) {
         LOG.error("State Dump (the requested range: "
-            + new TupleRange(keySchema, start, end) + ", idx min: "
+            + "[" + start + ", " + end +")" + ", idx min: "
             + idxReader.getFirstKey() + ", idx max: "
             + idxReader.getLastKey());
         throw ioe;
@@ -650,7 +642,7 @@ public class TajoPullServerService extends AbstractService {
     if (startOffset == -1) {
       throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
           "State Dump (the requested range: "
-          + new TupleRange(keySchema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
     }
 
@@ -660,6 +652,8 @@ public class TajoPullServerService extends AbstractService {
       endOffset = data.length();
     }
 
+    idxReader.close();
+
     FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
     LOG.info("Retrieve File Chunk: " + chunk);
     return chunk;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index e4439f3..1d4963d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -18,18 +18,15 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -46,25 +43,23 @@ public class MergeScanner implements Scanner {
   private boolean selectable = false;
   private Schema target;
 
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
       throws IOException {
     this(conf, schema, meta, rawFragmentList, schema);
   }
 
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList,
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList,
                       Schema target)
       throws IOException {
     this.conf = conf;
     this.schema = schema;
     this.meta = meta;
-    this.fragments = Lists.newArrayList();
-    for (Fragment f : rawFragmentList) {
-      fragments.add((FileFragment) f);
-    }
-    Collections.sort(fragments);
-
     this.target = target;
+
+    // it should keep the input order. Otherwise, it causes wrong result of sort queries.
+    this.fragments = ImmutableList.copyOf(rawFragmentList);
     this.reset();
+
     if (currentScanner != null) {
       this.projectable = currentScanner.isProjectable();
       this.selectable = currentScanner.isSelectable();
@@ -118,9 +113,6 @@ public class MergeScanner implements Scanner {
       currentScanner.close();
     }
     iterator = null;
-    if(fragments != null) {
-      fragments.clear();
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 73b3692..c3a7525 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -71,10 +71,14 @@ public class RawFile {
 
     public void init() throws IOException {
       File file;
-      if (path.toUri().getScheme() != null) {
-        file = new File(path.toUri());
-      } else {
-        file = new File(path.toString());
+      try {
+        if (path.toUri().getScheme() != null) {
+          file = new File(path.toUri());
+        } else {
+          file = new File(path.toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
       }
 
       fis = new FileInputStream(file);
@@ -405,10 +409,14 @@ public class RawFile {
 
     public void init() throws IOException {
       File file;
-      if (path.toUri().getScheme() != null) {
-        file = new File(path.toUri());
-      } else {
-        file = new File(path.toString());
+      try {
+        if (path.toUri().getScheme() != null) {
+          file = new File(path.toUri());
+        } else {
+          file = new File(path.toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
       }
 
       randomAccessFile = new RandomAccessFile(file, "rw");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 69c1c04..30f2810 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -46,6 +46,10 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
   private Datum right;
   private int compVal;
 
+  /**
+   * @param schema The schema of input tuples
+   * @param sortKeys The description of sort keys
+   */
   public TupleComparator(Schema schema, SortSpec[] sortKeys) {
     Preconditions.checkArgument(sortKeys.length > 0, 
         "At least one sort key must be specified.");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
index 7d0f674..1309116 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -19,39 +19,33 @@
 package org.apache.tajo.storage;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 
 import java.util.Comparator;
 
+/**
+ * It represents a pair of start and end tuples.
+ */
 public class TupleRange implements Comparable<TupleRange> {
-  private final Schema schema;
   private final Tuple start;
   private final Tuple end;
   private final TupleComparator comp;
 
-  public TupleRange(final Schema schema, final Tuple start, final Tuple end) {
-    this.comp = new TupleComparator(schema, schemaToSortSpecs(schema));
+  public TupleRange(final SortSpec [] sortSpecs, final Tuple start, final Tuple end) {
+    this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
     // if there is only one value, start == end
-    Preconditions.checkArgument(comp.compare(start, end) <= 0, ("start=" + start) + ", end=" + end);
-    this.schema = schema;
     this.start = start;
     this.end = end;
   }
 
-  public static SortSpec[] schemaToSortSpecs(Schema schema) {
-    SortSpec[] specs = new SortSpec[schema.getColumnNum()];
-
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      specs[i] = new SortSpec(schema.getColumn(i), true, false);
+  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+    Schema schema = new Schema();
+    for (SortSpec spec : sortSpecs) {
+      schema.addColumn(spec.getSortKey());
     }
 
-    return specs;
-  }
-
-  public final Schema getSchema() {
-    return this.schema;
+    return schema;
   }
 
   public final Tuple getStart() {


[2/2] git commit: TAJO-584: Improve distributed merge sort.

Posted by hy...@apache.org.
TAJO-584: Improve distributed merge sort.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/214b9741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/214b9741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/214b9741

Branch: refs/heads/master
Commit: 214b9741a510d1c2013e0dd494ab66017962367a
Parents: 4179a7c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Feb 11 01:48:15 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Feb 11 01:48:15 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../exception/AlreadyExistsTableException.java  |   2 +-
 .../org/apache/tajo/jdbc/TajoResultSet.java     |  24 +-
 .../tajo/engine/planner/BaseAlgebraVisitor.java |  17 +-
 .../apache/tajo/engine/planner/LogicalPlan.java |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 149 ++++++---
 .../apache/tajo/engine/planner/PlannerUtil.java |  10 +
 .../engine/planner/PreLogicalPlanVerifier.java  |  33 +-
 .../engine/planner/RangePartitionAlgorithm.java |  86 +++--
 .../engine/planner/UniformRangePartition.java   | 160 +++++++---
 .../engine/planner/global/GlobalPlanner.java    |   2 +
 .../planner/physical/ExternalSortExec.java      |  77 +++--
 .../planner/physical/UnaryPhysicalExec.java     |  12 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java | 316 +------------------
 .../tajo/master/querymaster/Repartitioner.java  |   8 +-
 .../tajo/worker/RangeRetrieverHandler.java      |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../planner/TestUniformRangePartition.java      |  68 ++--
 .../planner/physical/TestBSTIndexExec.java      |   3 +-
 .../planner/physical/TestExternalSortExec.java  |   6 +-
 .../physical/TestLeftOuterHashJoinExec.java     |   4 +
 .../physical/TestLeftOuterNLJoinExec.java       |  21 +-
 .../planner/physical/TestPhysicalPlanner.java   |   3 +
 .../physical/TestRightOuterHashJoinExec.java    |   5 +-
 .../engine/planner/physical/TestSortExec.java   |   6 +-
 .../apache/tajo/engine/query/TestSortQuery.java |  10 +
 .../apache/tajo/engine/util/TestTupleUtil.java  | 126 +-------
 .../tajo/worker/TestRangeRetrieverHandler.java  |  12 +-
 .../resources/dataset/TestSortQuery/table2.tbl  |  24 ++
 .../org/apache/tajo/jdbc/TestTajoResultSet.java |  63 ++++
 .../TestJoinQuery/testOuterJoinAndCaseWhen1.sql |   5 +-
 .../create_table_with_asc_desc_keys.sql         |   1 +
 .../TestSortQuery/testSortWithAscDescKeys.sql   |   1 +
 .../tajo/pullserver/PullServerAuxService.java   |   9 +-
 .../tajo/pullserver/TajoPullServerService.java  |  18 +-
 .../org/apache/tajo/storage/MergeScanner.java   |  22 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  24 +-
 .../apache/tajo/storage/TupleComparator.java    |   4 +
 .../org/apache/tajo/storage/TupleRange.java     |  26 +-
 39 files changed, 680 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8dc5ee5..d79ec91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-584: Improve distributed merge sort. (hyunsik)
+
     TAJO-36: Improve ExternalSortExec with N-merge sort and final pass
     omission. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
index 98c04b5..a2e3a6a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsTableException.java
@@ -26,6 +26,6 @@ public class AlreadyExistsTableException extends CatalogException {
 	}
 
 	public AlreadyExistsTableException(String tableName) {
-		super("Already Exists Table: "+tableName);
+		super("Already exists table: "+tableName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 3977d76..942107c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.jdbc;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.FileScanner;
@@ -37,7 +37,6 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -63,7 +62,7 @@ public class TajoResultSet extends TajoResultSetBase {
       fs = FileScanner.getFileSystem(conf, desc.getPath());
       this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
 
-      Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+      List<FileFragment> frags = getFragments(desc.getPath());
       scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
     }
     init();
@@ -75,23 +74,30 @@ public class TajoResultSet extends TajoResultSetBase {
     curRow = 0;
   }
 
-  static class FileNameComparator implements Comparator<FileStatus> {
+  public static class FileNameComparator implements Comparator<FileStatus> {
 
     @Override
     public int compare(FileStatus f1, FileStatus f2) {
-      return f2.getPath().getName().compareTo(f1.getPath().getName());
+      return f1.getPath().getName().compareTo(f2.getPath().getName());
     }
   }
 
-  private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
+  private List<FileFragment> getFragments(Path tablePath)
       throws IOException {
-    List<FileFragment> fraglist = Lists.newArrayList();
+    List<FileFragment> fragments = Lists.newArrayList();
     FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
       @Override
       public boolean accept(Path path) {
         return path.getName().charAt(0) != '.';
       }
     });
+
+
+    // The files must be sorted in an ascending order of file names
+    // in order to guarantee the order of a sort operation.
+    // This is because our distributed sort algorithm outputs
+    // a sequence of sorted data files, each of which contains sorted rows
+    // within each file.
     Arrays.sort(files, new FileNameComparator());
 
     String tbname = tablePath.getName();
@@ -99,9 +105,9 @@ public class TajoResultSet extends TajoResultSetBase {
       if (files[i].getLen() == 0) {
         continue;
       }
-      fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+      fragments.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
     }
-    return fraglist;
+    return ImmutableList.copyOf(fragments);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 6cc6fd0..25ee316 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -269,14 +269,19 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
   @Override
   public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
     stack.push(expr);
-    if (!expr.isAllProjected()) {
-      for (NamedExpr target : expr.getNamedExprs()) {
-        visit(ctx, stack, target);
+    try {
+      if (!expr.isAllProjected()) {
+        for (NamedExpr target : expr.getNamedExprs()) {
+          visit(ctx, stack, target);
+        }
       }
+      if (expr.hasChild()) {
+        return visit(ctx, stack, expr.getChild());
+      }
+    } finally {
+      stack.pop();
     }
-    RESULT result = visit(ctx, stack, expr.getChild());
-    stack.pop();
-    return result;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 6afaaad..9890943 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -245,7 +245,7 @@ public class LogicalPlan {
 
       // Trying to find the column within the current block
 
-      if (block.currentNode != null) {
+      if (block.currentNode != null && block.currentNode.getInSchema() != null) {
         Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName());
         if (found != null) {
           return found;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index dfa2e40..5583efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -35,6 +35,7 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -45,6 +46,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Stack;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
@@ -73,7 +75,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     PhysicalExec execPlan;
 
     try {
-      execPlan = createPlanRecursive(context, logicalPlan);
+      execPlan = createPlanRecursive(context, logicalPlan, new Stack<LogicalNode>());
       if (execPlan instanceof StoreTableExec
           || execPlan instanceof RangeShuffleFileWriteExec
           || execPlan instanceof HashShuffleFileWriteExec
@@ -103,7 +105,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return outExecPlan;
   }
 
-  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack)
+      throws IOException {
     PhysicalExec leftExec;
     PhysicalExec rightExec;
 
@@ -111,7 +114,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
       case ROOT:
         LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
-        return createPlanRecursive(ctx, rootNode.getChild());
+        stack.push(rootNode);
+        leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack);
+        stack.pop();
+        return leftExec;
 
       case EXPRS:
         EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,61 +127,81 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       case INSERT:
       case STORE:
         StoreTableNode storeNode = (StoreTableNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, storeNode.getChild());
+        stack.push(storeNode);
+        leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack);
+        stack.pop();
         return createStorePlan(ctx, storeNode, leftExec);
 
       case SELECTION:
         SelectionNode selNode = (SelectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, selNode.getChild());
+        stack.push(selNode);
+        leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
+        stack.pop();
         return new SelectionExec(ctx, selNode, leftExec);
 
       case PROJECTION:
         ProjectionNode prjNode = (ProjectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, prjNode.getChild());
+        stack.push(prjNode);
+        leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
+        stack.pop();
         return new ProjectionExec(ctx, prjNode, leftExec);
 
       case TABLE_SUBQUERY: {
         TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
-        ProjectionExec projectionExec = new ProjectionExec(ctx, subQueryNode, leftExec);
-        return projectionExec;
+        stack.push(subQueryNode);
+        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack);
+        stack.pop();
+        return new ProjectionExec(ctx, subQueryNode, leftExec);
+
       }
 
       case PARTITIONS_SCAN:
       case SCAN:
-        leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
+        leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack);
         return leftExec;
 
       case GROUP_BY:
         GroupbyNode grpNode = (GroupbyNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, grpNode.getChild());
+        stack.push(grpNode);
+        leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack);
+        stack.pop();
         return createGroupByPlan(ctx, grpNode, leftExec);
 
       case HAVING:
         HavingNode havingNode = (HavingNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, havingNode.getChild());
+        stack.push(havingNode);
+        leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack);
+        stack.pop();
         return new HavingExec(ctx, havingNode, leftExec);
 
       case SORT:
         SortNode sortNode = (SortNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, sortNode.getChild());
+        stack.push(sortNode);
+        leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack);
+        stack.pop();
         return createSortPlan(ctx, sortNode, leftExec);
 
       case JOIN:
         JoinNode joinNode = (JoinNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
+        stack.push(joinNode);
+        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
+        stack.pop();
         return createJoinPlan(ctx, joinNode, leftExec, rightExec);
 
       case UNION:
         UnionNode unionNode = (UnionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
+        stack.push(unionNode);
+        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack);
+        stack.pop();
         return new UnionExec(ctx, leftExec, rightExec);
 
       case LIMIT:
         LimitNode limitNode = (LimitNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, limitNode.getChild());
+        stack.push(limitNode);
+        leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack);
+        stack.pop();
         return new LimitExec(ctx, limitNode.getInSchema(),
             limitNode.getOutSchema(), leftExec, limitNode);
 
@@ -745,41 +771,65 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec);
   }
 
-  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
-    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
-        "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
-
+  private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) {
     Enforcer enforcer = ctx.getEnforcer();
+    List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
+    if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
+      SortNode sortNode = (SortNode) node.peek();
+      TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
+
+      boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
+      SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList());
+      return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
+    } else {
+      return false;
+    }
+  }
 
-    // check if this table is broadcasted one or not.
-    boolean broadcastFlag = false;
-    if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
-      List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
-      for (EnforceProperty property : properties) {
-        broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+      throws IOException {
+    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
+        "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");    
+
+    // check if an input is sorted in the same order to the subsequence sort operator.
+    // TODO - it works only if input files are raw files. We should check the file format.
+    // Since the default intermediate file format is raw file, it is not problem right now.
+    if (checkIfSortEquivalance(ctx, scanNode, node)) {
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments);
+    } else {
+      Enforcer enforcer = ctx.getEnforcer();
+
+      // check if this table is broadcasted one or not.
+      boolean broadcastFlag = false;
+      if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+        List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+        for (EnforceProperty property : properties) {
+          broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+        }
       }
-    }
 
-    if (scanNode instanceof PartitionedTableScanNode
-        && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
-        ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+      if (scanNode instanceof PartitionedTableScanNode
+          && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+          ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
 
-      if (scanNode instanceof PartitionedTableScanNode) {
-        if (broadcastFlag) {
-          PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
-          List<FileFragment> fileFragments = TUtil.newList();
-          for (Path path : partitionedTableScanNode.getInputPaths()) {
-            fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
-          }
+        if (scanNode instanceof PartitionedTableScanNode) {
+          if (broadcastFlag) {
+            PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+            List<FileFragment> fileFragments = TUtil.newList();
+            for (Path path : partitionedTableScanNode.getInputPaths()) {
+              fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+            }
 
-          return new PartitionMergeScanExec(ctx, sm, scanNode,
-              FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+            return new PartitionMergeScanExec(ctx, sm, scanNode,
+                FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+          }
         }
       }
-    }
 
-    FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
-    return new SeqScanExec(ctx, sm, scanNode, fragments);
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new SeqScanExec(ctx, sm, scanNode, fragments);
+    }
   }
 
   public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -858,6 +908,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
+
+    // check if it is a distributed merge sort
+    // If so, it does need to create a sort executor because
+    // the sort executor is created at the scan planning
+    if (child instanceof SortExec) {
+      SortExec childSortExec = (SortExec) child;
+      if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) {
+        return child;
+      }
+    }
+
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
     if (property != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index b59cdda..9ada2ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -30,6 +30,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
@@ -749,4 +750,13 @@ public class PlannerUtil {
     }
     return names;
   }
+
+  public static SortSpec [] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
+    SortSpec [] sortSpecs = new SortSpec[sortSpecProtos.size()];
+    int i = 0;
+    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+      sortSpecs[i++] = new SortSpec(proto);
+    }
+    return sortSpecs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 1843b5a..fe93f46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -58,11 +58,11 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
 
   @Override
   public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
-    checkRelationExistence(state, expr.getName());
+    assertRelationExistence(state, expr.getName());
     return expr;
   }
 
-  private boolean checkRelationExistence(VerificationState state, String name) {
+  private boolean assertRelationExistence(VerificationState state, String name) {
     if (!catalog.existsTable(name)) {
       state.addVerification(String.format("relation \"%s\" does not exist", name));
       return false;
@@ -70,15 +70,34 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
     return true;
   }
 
+  private boolean assertRelationNoExistence(VerificationState state, String name) {
+    if (catalog.existsTable(name)) {
+      state.addVerification(String.format("relation \"%s\" already exists", name));
+      return false;
+    }
+    return true;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitCreateTable(VerificationState state, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    super.visitCreateTable(state, stack, expr);
+    assertRelationNoExistence(state, expr.getTableName());
+    return expr;
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Insert or Update Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public Expr visitInsert(VerificationState context, Stack<Expr> stack, Insert expr) throws PlanningException {
-    Expr child = super.visitInsert(context, stack, expr);
+  public Expr visitInsert(VerificationState state, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(state, stack, expr);
 
     if (expr.hasTableName()) {
-      checkRelationExistence(context, expr.getTableName());
+      assertRelationExistence(state, expr.getTableName());
     }
 
     if (child != null && child.getType() == OpType.Projection) {
@@ -88,9 +107,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
         int targetColumnNum = expr.getTargetColumns().length;
 
         if (targetColumnNum > projectColumnNum)  {
-          context.addVerification("INSERT has more target columns than expressions");
+          state.addVerification("INSERT has more target columns than expressions");
         } else if (targetColumnNum < projectColumnNum) {
-          context.addVerification("INSERT has more expressions than target columns");
+          state.addVerification("INSERT has more expressions than target columns");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index fc56b55..5bff857 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.Tuple;
@@ -28,7 +28,7 @@ import org.apache.tajo.storage.TupleRange;
 import java.math.BigDecimal;
 
 public abstract class RangePartitionAlgorithm {
-  protected Schema schema;
+  protected SortSpec [] sortSpecs;
   protected TupleRange range;
   protected final BigDecimal totalCard;
   /** true if the end of the range is inclusive. Otherwise, it should be false. */
@@ -36,15 +36,15 @@ public abstract class RangePartitionAlgorithm {
 
   /**
    *
-   * @param schema the schema of the range tuples
-   * @param range range to be partition
+   * @param sortSpecs The array of sort keys
+   * @param totalRange The total range to be partition
    * @param inclusive true if the end of the range is inclusive. Otherwise, false.
    */
-  public RangePartitionAlgorithm(Schema schema, TupleRange range, boolean inclusive) {
-    this.schema = schema;
-    this.range = range;
+  public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
+    this.sortSpecs = sortSpecs;
+    this.range = totalRange;
     this.inclusive = inclusive;
-    this.totalCard = computeCardinalityForAllColumns(schema, range, inclusive);
+    this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
   }
 
   /**
@@ -56,7 +56,7 @@ public abstract class RangePartitionAlgorithm {
    * @return
    */
   public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
-                                              boolean inclusive) {
+                                              boolean inclusive, boolean isAscending) {
     BigDecimal columnCard;
 
     switch (dataType.getType()) {
@@ -64,35 +64,75 @@ public abstract class RangePartitionAlgorithm {
         columnCard = new BigDecimal(2);
         break;
       case CHAR:
-        columnCard = new BigDecimal(end.asChar() - start.asChar());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asChar() - start.asChar());
+        } else {
+          columnCard = new BigDecimal(start.asChar() - end.asChar());
+        }
         break;
       case BIT:
-        columnCard = new BigDecimal(end.asByte() - start.asByte());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asByte() - start.asByte());
+        } else {
+          columnCard = new BigDecimal(start.asByte() - end.asByte());
+        }
         break;
       case INT2:
-        columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        } else {
+          columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+        }
         break;
       case INT4:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case INT8:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       case FLOAT4:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case FLOAT8:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       case TEXT:
-        columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+        } else {
+          columnCard = new BigDecimal(start.asChars().charAt(0) - end.asChars().charAt(0));
+        }
         break;
       case DATE:
-        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
         break;
       case TIME:
       case TIMESTAMP:
-        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
         break;
       default:
         throw new UnsupportedOperationException(dataType + " is not supported yet");
@@ -105,16 +145,16 @@ public abstract class RangePartitionAlgorithm {
    * It computes the value cardinality of a tuple range.
    * @return
    */
-  public static BigDecimal computeCardinalityForAllColumns(Schema schema, TupleRange range, boolean inclusive) {
+  public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
     Tuple start = range.getStart();
     Tuple end = range.getEnd();
     Column col;
 
     BigDecimal cardinality = new BigDecimal(1);
     BigDecimal columnCard;
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      columnCard = computeCardinality(col.getDataType(), start.get(i), end.get(i), inclusive);
+    for (int i = 0; i < sortSpecs.length; i++) {
+      columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+          sortSpecs[i].isAscending());
 
       if (new BigDecimal(0).compareTo(columnCard) < 0) {
         cardinality = cardinality.multiply(columnCard);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 4f18c95..948b19e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.exception.RangeOverflowException;
@@ -33,6 +33,7 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.List;
 
+
 public class UniformRangePartition extends RangePartitionAlgorithm {
   private int variableId;
   private BigDecimal[] cardForEachDigit;
@@ -40,16 +41,16 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
 
   /**
    *
-   * @param schema
-   * @param range
+   * @param totalRange
+   * @param sortSpecs The description of sort keys
    * @param inclusive true if the end of the range is inclusive
    */
-  public UniformRangePartition(Schema schema, TupleRange range, boolean inclusive) {
-    super(schema, range, inclusive);
-    colCards = new BigDecimal[schema.getColumnNum()];
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      colCards[i] = computeCardinality(schema.getColumn(i).getDataType(), range.getStart().get(i),
-          range.getEnd().get(i), inclusive);
+  public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+    super(sortSpecs, totalRange, inclusive);
+    colCards = new BigDecimal[sortSpecs.length];
+    for (int i = 0; i < sortSpecs.length; i++) {
+      colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
+          totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
     }
 
     cardForEachDigit = new BigDecimal[colCards.length];
@@ -62,15 +63,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     }
   }
 
-  public UniformRangePartition(Schema schema, TupleRange range) {
-    this(schema, range, true);
+  public UniformRangePartition(TupleRange range, SortSpec [] sortSpecs) {
+    this(range, sortSpecs, true);
   }
 
   @Override
   public TupleRange[] partition(int partNum) {
     Preconditions.checkArgument(partNum > 0,
         "The number of partitions must be positive, but the given number: "
-        + partNum);
+            + partNum);
     Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
         "the number of partition cannot exceed total cardinality (" + totalCard + ")");
 
@@ -97,10 +98,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     Tuple last = range.getStart();
     while(reminder.compareTo(new BigDecimal(0)) > 0) {
       if (reminder.compareTo(term) <= 0) { // final one is inclusive
-        ranges.add(new TupleRange(schema, last, range.getEnd()));
+        ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
       } else {
         Tuple next = increment(last, term.longValue(), variableId);
-        ranges.add(new TupleRange(schema, last, next));
+        ranges.add(new TupleRange(sortSpecs, last, next));
       }
       last = ranges.get(ranges.size() - 1).getEnd();
       reminder = reminder.subtract(term);
@@ -109,49 +110,103 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     return ranges.toArray(new TupleRange[ranges.size()]);
   }
 
-  public boolean isOverflow(int colId, Datum last, BigDecimal inc) {
-    Column column = schema.getColumn(colId);
+  /**
+  *  Check whether an overflow occurs or not.
+   *
+   * @param colId The column id to be checked
+   * @param last
+   * @param inc
+   * @param sortSpecs
+   * @return
+   */
+  public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+    Column column = sortSpecs[colId].getSortKey();
     BigDecimal candidate;
     boolean overflow = false;
+
     switch (column.getDataType().getType()) {
       case BIT: {
-        candidate = inc.add(new BigDecimal(last.asByte()));
-        return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asByte()));
+          return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asByte()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+        }
       }
       case CHAR: {
-        candidate = inc.add(new BigDecimal((int)last.asChar()));
-        return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)last.asChar()));
+          return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)last.asChar()).subtract(inc);
+          return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+        }
       }
       case INT2: {
-        candidate = inc.add(new BigDecimal(last.asInt2()));
-        return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt2()));
+          return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt2()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+        }
       }
+      case DATE:
       case INT4: {
-        candidate = inc.add(new BigDecimal(last.asInt4()));
-        return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt4()));
+          return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+        }
       }
+      case TIME:
+      case TIMESTAMP:
       case INT8: {
-        candidate = inc.add(new BigDecimal(last.asInt8()));
-        return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt8()));
+          return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+        }
       }
       case FLOAT4: {
-        candidate = inc.add(new BigDecimal(last.asFloat4()));
-        return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat4()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+        }
       }
       case FLOAT8: {
-        candidate = inc.add(new BigDecimal(last.asFloat8()));
-        return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat8()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+        }
+
       }
       case TEXT: {
-        candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
-        return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
+          return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+        }
       }
     }
     return overflow;
   }
 
   public long incrementAndGetReminder(int colId, Datum last, long inc) {
-    Column column = schema.getColumn(colId);
+    Column column = sortSpecs[colId].getSortKey();
     long reminder = 0;
     switch (column.getDataType().getType()) {
       case BIT: {
@@ -166,12 +221,15 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         reminder = candidate - end;
         break;
       }
+      case DATE:
       case INT4: {
         int candidate = (int) (last.asInt4() + inc);
         int end = range.getEnd().get(colId).asInt4();
         reminder = candidate - end;
         break;
       }
+      case TIME:
+      case TIMESTAMP:
       case INT8: {
         long candidate = last.asInt8() + inc;
         long end = range.getEnd().get(colId).asInt8();
@@ -231,7 +289,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     int finalId = baseDigit;
     incs[finalId] = value;
     for (int i = finalId; i >= 0; i--) {
-      if (isOverflow(i, last.get(i), incs[i])) {
+      if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
         if (i == 0) {
           throw new RangeOverflowException(range, last, incs[i].longValue());
         }
@@ -253,10 +311,10 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       }
     }
 
-    Tuple end = new VTuple(schema.getColumnNum());
+    Tuple end = new VTuple(sortSpecs.length);
     Column column;
     for (int i = 0; i < last.size(); i++) {
-      column = schema.getColumn(i);
+      column = sortSpecs[i].getSortKey();
       switch (column.getDataType().getType()) {
         case CHAR:
           if (overflowFlag[i]) {
@@ -286,13 +344,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createInt4(
                 (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+            }
           }
           break;
         case INT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt8(
-                range.getStart().get(i).asInt4() + incs[i].longValue()));
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
           }
@@ -322,6 +384,28 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
                 ((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
           }
           break;
+        case DATE:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+          }
+          break;
+        case TIME:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case TIMESTAMP:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTimeStampFromMillis(
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTimeStampFromMillis(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
         default:
           throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 2aa93d7..d040d7e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -469,6 +469,7 @@ public class GlobalPlanner {
         currentNode.setChild(secondScan);
         currentNode.setInSchema(secondScan.getOutSchema());
         currentBlock.setPlan(currentNode);
+        currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
       }
     } else {
       LogicalNode childBlockPlan = childBlock.getPlan();
@@ -487,6 +488,7 @@ public class GlobalPlanner {
       currentNode.setChild(secondScan);
       currentNode.setInSchema(secondScan.getOutSchema());
       currentBlock.setPlan(currentNode);
+      currentBlock.getEnforcer().addSortedInput(secondScan.getTableName(), currentNode.getSortKeys());
       masterPlan.addConnect(channel);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index a3b37fc..f0ac290 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -27,11 +27,14 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -76,8 +79,10 @@ public class ExternalSortExec extends SortExec {
   private final LocalDirAllocator localDirAllocator;
   /** local file system */
   private final RawLocalFileSystem localFS;
-  /** final output files */
+  /** final output files which are used for cleaning */
   private List<Path> finalOutputFiles = null;
+  /** for directly merging sorted inputs */
+  private List<Path> mergedInputPaths = null;
 
   ///////////////////////////////////////////////////
   // transient variables
@@ -89,10 +94,10 @@ public class ExternalSortExec extends SortExec {
   /** the final result */
   private Scanner result;
 
-  public ExternalSortExec(final TaskAttemptContext context,
-                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
-      throws IOException {
-    super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
+  private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+      throws PhysicalPlanningException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
+
     this.plan = plan;
     this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
 
@@ -111,6 +116,25 @@ public class ExternalSortExec extends SortExec {
     localFS = new RawLocalFileSystem();
   }
 
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan,
+                          final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
+    this(context, sm, plan);
+
+    mergedInputPaths = TUtil.newList();
+    for (CatalogProtos.FragmentProto proto : fragments) {
+      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      mergedInputPaths.add(fragment.getPath());
+    }
+  }
+
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+      throws IOException {
+    this(context, sm, plan);
+    setChild(child);
+  }
+
   public void init() throws IOException {
     super.init();
   }
@@ -120,9 +144,9 @@ public class ExternalSortExec extends SortExec {
   }
 
   /**
-   * Sort tuple block and store them into a chunk file
+   * Sort a tuple block and store them into a chunk file
    */
-  private final Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+  private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
       throws IOException {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
     int rowNum = tupleBlock.size();
@@ -156,7 +180,7 @@ public class ExternalSortExec extends SortExec {
    * @return All paths of chunks
    * @throws java.io.IOException
    */
-  private final List<Path> sortAndStoreAllChunks() throws IOException {
+  private List<Path> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
     int memoryConsumption = 0;
     List<Path> chunkPaths = TUtil.newList();
@@ -212,22 +236,31 @@ public class ExternalSortExec extends SortExec {
 
     if (!sorted) { // if not sorted, first sort all data
 
-      // Try to sort all data, and store them into a number of chunks if memory exceeds
-      long startTimeOfChunkSplit = System.currentTimeMillis();
-      List<Path> chunks = sortAndStoreAllChunks();
-      long endTimeOfChunkSplit = System.currentTimeMillis();
-      info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
-
-      if (memoryResident) { // if all sorted data reside in a main-memory table.
-        this.result = new MemTableScanner();
-      } else { // if input data exceeds main-memory at least once
-
+      // if input files are given, it starts merging directly.
+      if (mergedInputPaths != null) {
         try {
-          this.result = externalMergeAndSort(chunks);
-        } catch (Exception exception) {
-          throw new PhysicalPlanningException(exception);
+          this.result = externalMergeAndSort(mergedInputPaths);
+        } catch (Exception e) {
+          throw new PhysicalPlanningException(e);
         }
+      } else {
+        // Try to sort all data, and store them as multiple chunks if memory exceeds
+        long startTimeOfChunkSplit = System.currentTimeMillis();
+        List<Path> chunks = sortAndStoreAllChunks();
+        long endTimeOfChunkSplit = System.currentTimeMillis();
+        info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+        if (memoryResident) { // if all sorted data reside in a main-memory table.
+          this.result = new MemTableScanner();
+        } else { // if input data exceeds main-memory at least once
+
+          try {
+            this.result = externalMergeAndSort(chunks);
+          } catch (Exception e) {
+            throw new PhysicalPlanningException(e);
+          }
 
+        }
       }
 
       sorted = true;
@@ -511,7 +544,7 @@ public class ExternalSortExec extends SortExec {
           outTuple = rightTuple;
           rightTuple = rightScan.next();
         }
-        return outTuple;
+        return new VTuple(outTuple);
       }
 
       if (leftTuple == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index a8dd877..f31455f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -44,14 +44,20 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
   }
 
   public void init() throws IOException {
-    child.init();
+    if (child != null) {
+      child.init();
+    }
   }
 
   public void rescan() throws IOException {
-    child.rescan();
+    if (child != null) {
+      child.rescan();
+    }
   }
 
   public void close() throws IOException {
-    child.close();
+    if (child != null) {
+      child.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 42508a0..b96b65e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -22,26 +22,20 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.Collection;
 import java.util.Iterator;
@@ -49,279 +43,8 @@ import java.util.List;
 import java.util.Map;
 
 public class TupleUtil {
-  /** class logger **/
-  private static final Log LOG = LogFactory.getLog(TupleUtil.class);
 
-  /**
-   * It computes the value cardinality of a tuple range.
-   *
-   * @param schema
-   * @param range
-   * @return
-   */
-  public static long computeCardinality(Schema schema, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-
-    long cardinality = 1;
-    long columnCard;
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          columnCard = end.get(i).asChar() - start.get(i).asChar();
-          break;
-        case BIT:
-          columnCard = end.get(i).asByte() - start.get(i).asByte();
-          break;
-        case INT2:
-          columnCard = end.get(i).asInt2() - start.get(i).asInt2();
-          break;
-        case INT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case INT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case FLOAT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case FLOAT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case TEXT:
-          columnCard = end.get(i).asChars().charAt(0) - start.get(i).asChars().charAt(0);
-          break;
-        default:
-          throw new UnsupportedOperationException(col.getDataType() + " is not supported yet");
-      }
-
-      if (columnCard > 0) {
-        cardinality *= columnCard + 1;
-      }
-    }
-
-    return cardinality;
-  }
-
-  public static TupleRange [] getPartitions(Schema schema, int partNum, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-    TupleRange [] partitioned = new TupleRange[partNum];
-
-    Datum[] term = new Datum[schema.getColumnNum()];
-    Datum[] prevValues = new Datum[schema.getColumnNum()];
-
-    // initialize term and previous values
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      prevValues[i] = start.get(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          int sChar = start.get(i).asChar();
-          int eChar = end.get(i).asChar();
-          int rangeChar;
-          if ((eChar - sChar) > partNum) {
-            rangeChar = (eChar - sChar) / partNum;
-          } else {
-            rangeChar = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeChar);
-        case BIT:
-          byte sByte = start.get(i).asByte();
-          byte eByte = end.get(i).asByte();
-          int rangeByte;
-          if ((eByte - sByte) > partNum) {
-            rangeByte = (eByte - sByte) / partNum;
-          } else {
-            rangeByte = 1;
-          }
-          term[i] = DatumFactory.createBit((byte) rangeByte);
-          break;
-
-        case INT2:
-          short sShort = start.get(i).asInt2();
-          short eShort = end.get(i).asInt2();
-          int rangeShort;
-          if ((eShort - sShort) > partNum) {
-            rangeShort = (eShort - sShort) / partNum;
-          } else {
-            rangeShort = 1;
-          }
-          term[i] = DatumFactory.createInt2((short) rangeShort);
-          break;
-
-        case INT4:
-          int sInt = start.get(i).asInt4();
-          int eInt = end.get(i).asInt4();
-          int rangeInt;
-          if ((eInt - sInt) > partNum) {
-            rangeInt = (eInt - sInt) / partNum;
-          } else {
-            rangeInt = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeInt);
-          break;
-
-        case INT8:
-          long sLong = start.get(i).asInt8();
-          long eLong = end.get(i).asInt8();
-          long rangeLong;
-          if ((eLong - sLong) > partNum) {
-            rangeLong = ((eLong - sLong) / partNum);
-          } else {
-            rangeLong = 1;
-          }
-          term[i] = DatumFactory.createInt8(rangeLong);
-          break;
-
-        case FLOAT4:
-          float sFloat = start.get(i).asFloat4();
-          float eFloat = end.get(i).asFloat4();
-          float rangeFloat;
-          if ((eFloat - sFloat) > partNum) {
-            rangeFloat = ((eFloat - sFloat) / partNum);
-          } else {
-            rangeFloat = 1;
-          }
-          term[i] = DatumFactory.createFloat4(rangeFloat);
-          break;
-        case FLOAT8:
-          double sDouble = start.get(i).asFloat8();
-          double eDouble = end.get(i).asFloat8();
-          double rangeDouble;
-          if ((eDouble - sDouble) > partNum) {
-            rangeDouble = ((eDouble - sDouble) / partNum);
-          } else {
-            rangeDouble = 1;
-          }
-          term[i] = DatumFactory.createFloat8(rangeDouble);
-          break;
-        case TEXT:
-          char sChars = start.get(i).asChars().charAt(0);
-          char eChars = end.get(i).asChars().charAt(0);
-          int rangeString;
-          if ((eChars - sChars) > partNum) {
-            rangeString = ((eChars - sChars) / partNum);
-          } else {
-            rangeString = 1;
-          }
-          term[i] = DatumFactory.createText(((char) rangeString) + "");
-          break;
-        case INET4:
-          throw new UnsupportedOperationException();
-        case BLOB:
-          throw new UnsupportedOperationException();
-        default:
-          throw new UnsupportedOperationException();
-      }
-    }
-
-    for (int p = 0; p < partNum; p++) {
-      Tuple sTuple = new VTuple(schema.getColumnNum());
-      Tuple eTuple = new VTuple(schema.getColumnNum());
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        col = schema.getColumn(i);
-        sTuple.put(i, prevValues[i]);
-        switch (col.getDataType().getType()) {
-          case CHAR:
-            char endChar = (char) (prevValues[i].asChar() + term[i].asChar());
-            if (endChar > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createChar(endChar));
-            }
-            prevValues[i] = DatumFactory.createChar(endChar);
-            break;
-          case BIT:
-            byte endByte = (byte) (prevValues[i].asByte() + term[i].asByte());
-            if (endByte > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createBit(endByte));
-            }
-            prevValues[i] = DatumFactory.createBit(endByte);
-            break;
-          case INT2:
-            int endShort = (short) (prevValues[i].asInt2() + term[i].asInt2());
-            if (endShort > end.get(i).asInt2()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt2((short) endShort));
-            }
-            prevValues[i] = DatumFactory.createInt2((short) endShort);
-            break;
-          case INT4:
-            int endInt = (prevValues[i].asInt4() + term[i].asInt4());
-            if (endInt > end.get(i).asInt4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt4(endInt));
-            }
-            prevValues[i] = DatumFactory.createInt4(endInt);
-            break;
-
-          case INT8:
-            long endLong = (prevValues[i].asInt8() + term[i].asInt8());
-            if (endLong > end.get(i).asInt8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt8(endLong));
-            }
-            prevValues[i] = DatumFactory.createInt8(endLong);
-            break;
-
-          case FLOAT4:
-            float endFloat = (prevValues[i].asFloat4() + term[i].asFloat4());
-            if (endFloat > end.get(i).asFloat4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat4(endFloat));
-            }
-            prevValues[i] = DatumFactory.createFloat4(endFloat);
-            break;
-          case FLOAT8:
-            double endDouble = (prevValues[i].asFloat8() + term[i].asFloat8());
-            if (endDouble > end.get(i).asFloat8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat8(endDouble));
-            }
-            prevValues[i] = DatumFactory.createFloat8(endDouble);
-            break;
-          case TEXT:
-            String endString = ((char)(prevValues[i].asChars().charAt(0) + term[i].asChars().charAt(0))) + "";
-            if (endString.charAt(0) > end.get(i).asChars().charAt(0)) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createText(endString));
-            }
-            prevValues[i] = DatumFactory.createText(endString);
-            break;
-          case INET4:
-            throw new UnsupportedOperationException();
-          case BLOB:
-            throw new UnsupportedOperationException();
-          default:
-            throw new UnsupportedOperationException();
-        }
-      }
-      partitioned[p] = new TupleRange(schema, sTuple, eTuple);
-    }
-
-    return partitioned;
-  }
-
-  public static String rangeToQuery(Schema schema, TupleRange range,
-                                    boolean ascendingFirstKey, boolean last)
+  public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
       throws UnsupportedEncodingException {
     StringBuilder sb = new StringBuilder();
     byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder
@@ -345,32 +68,8 @@ public class TupleUtil {
     return sb.toString();
   }
 
-  public static String [] rangesToQueries(final SortSpec[] sortSpec,
-                                          final TupleRange[] ranges)
-      throws UnsupportedEncodingException {
-    Schema schema = PlannerUtil.sortSpecsToSchema(sortSpec);
-    boolean ascendingFirstKey = sortSpec[0].isAscending();
-    String [] params = new String[ranges.length];
-    for (int i = 0; i < ranges.length; i++) {
-      params[i] =
-        rangeToQuery(schema, ranges[i], ascendingFirstKey,
-            ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
-    }
-    return params;
-  }
-
-  public static TupleRange queryToRange(Schema schema, String query) throws UnsupportedEncodingException {
-    Map<String,String> params = HttpUtil.getParamsFromQuery(query);
-    String startUrlDecoded = URLDecoder.decode(params.get("start"), "utf-8");
-    String endUrlDecoded = URLDecoder.decode(params.get("end"), "utf-8");
-    byte [] startBytes = Base64.decodeBase64(startUrlDecoded);
-    byte [] endBytes = Base64.decodeBase64(endUrlDecoded);
-    return new TupleRange(schema, RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, startBytes), RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, endBytes));
-  }
+  public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) {
 
-  public static TupleRange columnStatToRange(Schema schema, Schema target, List<ColumnStats> colStats) {
     Map<Column, ColumnStats> statSet = Maps.newHashMap();
     for (ColumnStats stat : colStats) {
       statSet.put(stat.getColumn(), stat);
@@ -385,11 +84,16 @@ public class TupleUtil {
     Tuple endTuple = new VTuple(target.getColumnNum());
     int i = 0;
     for (Column col : target.getColumns()) {
-      startTuple.put(i, statSet.get(col).getMinValue());
-      endTuple.put(i, statSet.get(col).getMaxValue());
+      if (sortSpecs[i].isAscending()) {
+        startTuple.put(i, statSet.get(col).getMinValue());
+        endTuple.put(i, statSet.get(col).getMaxValue());
+      } else {
+        startTuple.put(i, statSet.get(col).getMaxValue());
+        endTuple.put(i, statSet.get(col).getMinValue());
+      }
       i++;
     }
-    return new TupleRange(target, startTuple, endTuple);
+    return new TupleRange(sortSpecs, startTuple, endTuple);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 71ab8f9..8ea824e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -308,8 +308,8 @@ public class Repartitioner {
 
     // calculate the number of maximum query ranges
     TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
-    TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, totalStat.getColumnStats());
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(sortSchema, mergedRange);
+    TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+    RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
     BigDecimal card = partitioner.getTotalCardinality();
 
     // if the number of the range cardinality is less than the desired number of tasks,
@@ -355,8 +355,8 @@ public class Repartitioner {
       for (int i = 0; i < ranges.length; i++) {
         uris = new HashSet<URI>();
         for (String uri: basicFetchURIs) {
-          String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
-              ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+          String rangeParam =
+              TupleUtil.rangeToQuery(sortSchema, ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
           URI finalUri = URI.create(uri + "&" + rangeParam);
           uris.add(finalUri);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index 26991a0..a54fa80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -29,7 +29,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.worker.dataserver.retriever.FileChunk;
 import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -113,7 +112,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
       startOffset = idxReader.find(start);
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
     }
@@ -124,7 +123,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
       }
     } catch (IOException ioe) {
       LOG.error("State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
       throw ioe;
     }
@@ -136,7 +135,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
         startOffset = idxReader.find(start, true);
       } catch (IOException ioe) {
         LOG.error("State Dump (the requested range: "
-            + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
             + idxReader.getLastKey());
         throw ioe;
       }
@@ -145,7 +144,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
     if (startOffset == -1) {
       throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
           "State Dump (the requested range: "
-          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
           + idxReader.getLastKey());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 1b72d13..8a0b63a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -166,7 +166,7 @@ public class Task {
       this.shuffleType = context.getDataChannel().getShuffleType();
 
       if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-        SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
+        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index acd663f..3d5cdf2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner;
 
+import org.apache.tajo.catalog.SortSpec;
 import org.junit.Test;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -39,6 +40,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_returnflag", Type.TEXT)
     .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -46,11 +50,10 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createText("D"));
     e.put(1, DatumFactory.createText("C"));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
 
     String [] result = new String[12];
     result[0] = "AA";
@@ -84,6 +87,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_returnflag", Type.TEXT)
     .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -91,11 +97,10 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createText("D"));
     e.put(1, DatumFactory.createText("C"));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(12, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
 
     String [] result = new String[12];
     result[0] = "AA";
@@ -129,6 +134,8 @@ public class TestUniformRangePartition {
     .addColumn("l_linestatus", Type.TEXT)
     .addColumn("final", Type.TEXT);
 
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("A"));
@@ -138,11 +145,10 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createText("B")); //  2
     e.put(2, DatumFactory.createText("C")); // x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
-    assertEquals(24, TupleUtil.computeCardinality(schema, expected));
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().intValue());
 
     Tuple overflowBefore = partitioner.increment(s, 5, 2);
     assertEquals("A", overflowBefore.get(0).asChars());
@@ -159,6 +165,9 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
     .addColumn("l_orderkey", Type.INT8)
     .addColumn("l_linenumber", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createInt8(10));
     s.put(1, DatumFactory.createInt8(20));
@@ -166,10 +175,9 @@ public class TestUniformRangePartition {
     e.put(0, DatumFactory.createInt8(19));
     e.put(1, DatumFactory.createInt8(39));
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(200, partitioner.getTotalCardinality().longValue());
 
     Tuple range2 = partitioner.increment(s, 100, 1);
@@ -185,6 +193,9 @@ public class TestUniformRangePartition {
     .addColumn("l_orderkey", Type.INT8)
     .addColumn("l_linenumber", Type.INT8)
     .addColumn("final", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createInt8(1));
     s.put(1, DatumFactory.createInt8(1));
@@ -194,10 +205,9 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createInt8(2)); // 2
     e.put(2, DatumFactory.createInt8(3)); //x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner
-        = new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
     Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -216,6 +226,9 @@ public class TestUniformRangePartition {
       .addColumn("l_orderkey", Type.FLOAT8)
       .addColumn("l_linenumber", Type.FLOAT8)
       .addColumn("final", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(3);
     s.put(0, DatumFactory.createFloat8(1.1d));
     s.put(1, DatumFactory.createFloat8(1.1d));
@@ -225,10 +238,9 @@ public class TestUniformRangePartition {
     e.put(1, DatumFactory.createFloat8(2.1d)); // 2
     e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24
 
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
 
-    UniformRangePartition partitioner =
-        new UniformRangePartition(schema, expected);
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
     Tuple beforeOverflow = partitioner.increment(s, 5, 2);
@@ -246,15 +258,18 @@ public class TestUniformRangePartition {
     Schema schema = new Schema();
     schema.addColumn("l_returnflag", Type.TEXT);
     schema.addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("F"));
     Tuple e = new VTuple(2);
     e.put(0, DatumFactory.createText("R"));
     e.put(1, DatumFactory.createText("O"));
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
     RangePartitionAlgorithm partitioner
-        = new UniformRangePartition(schema, expected, true);
+        = new UniformRangePartition(expected, sortSpecs, true);
     TupleRange [] ranges = partitioner.partition(31);
 
 
@@ -273,15 +288,18 @@ public class TestUniformRangePartition {
     Schema schema = new Schema()
       .addColumn("l_returnflag", Type.TEXT)
       .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
     Tuple s = new VTuple(2);
     s.put(0, DatumFactory.createText("A"));
     s.put(1, DatumFactory.createText("F"));
     Tuple e = new VTuple(2);
     e.put(0, DatumFactory.createText("R"));
     e.put(1, DatumFactory.createText("O"));
-    TupleRange expected = new TupleRange(schema, s, e);
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
     RangePartitionAlgorithm partitioner =
-        new UniformRangePartition(schema, expected, true);
+        new UniformRangePartition(expected, sortSpecs, true);
     TupleRange [] ranges = partitioner.partition(1);
 
     assertEquals(expected, ranges[0]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/214b9741/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index cf24054..97932e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -51,6 +51,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.Stack;
 
 import static org.junit.Assert.assertEquals;
 
@@ -191,7 +192,7 @@ public class TestBSTIndexExec {
     }
 
     @Override
-    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
         throws IOException {
       Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
           "Error: There is no table matched to %s", scanNode.getTableName());