You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/16 13:06:11 UTC

[4/6] TAJO-907: Implement off-heap tuple block and zero-copy tuple.

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
new file mode 100644
index 0000000..7bd6a70
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
@@ -0,0 +1,128 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.annotation.NotNull;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+class MemTableScanner implements Scanner {
+  Iterable<Tuple> iterable;
+  Iterator<Tuple> iterator;
+  long inputBytes;
+
+  // for input stats
+  float scannerProgress;
+  int numRecords;
+  int totalRecords;
+  TableStats scannerTableStats;
+
+  public MemTableScanner(@NotNull Collection<Tuple> iterable, long inputBytes) {
+    Preconditions.checkNotNull(iterable);
+    this.iterable = iterable;
+    totalRecords = iterable.size();
+    this.inputBytes = inputBytes;
+  }
+
+  @Override
+  public void init() throws IOException {
+    scannerProgress = 0.0f;
+    numRecords = 0;
+
+    // it will be returned as the final stats
+    scannerTableStats = new TableStats();
+    scannerTableStats.setNumBytes(inputBytes);
+    scannerTableStats.setReadBytes(inputBytes);
+    scannerTableStats.setNumRows(totalRecords);
+
+    iterator = iterable.iterator();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (iterator.hasNext()) {
+      numRecords++;
+      return new VTuple(iterator.next());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    init();
+  }
+
+  @Override
+  public void close() throws IOException {
+    scannerProgress = 1.0f;
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return null;
+  }
+
+  @Override
+  public float getProgress() {
+    if (numRecords > 0) {
+      return (float)numRecords / (float)totalRecords;
+
+    } else { // if an input is empty
+      return scannerProgress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return scannerTableStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index e1cc6a8..bf95d19 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -25,9 +25,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -52,7 +52,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   private List<Tuple> rightTupleSlots;
 
   private JoinTupleComparator joincomparator = null;
-  private TupleComparator[] tupleComparator = null;
+  private BaseTupleComparator[] tupleComparator = null;
 
   private final static int INITIAL_TUPLE_SLOT = 10000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index bbfe973..d68597d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -24,9 +24,9 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -53,7 +53,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   private Iterator<Tuple> innerIterator;
 
   private JoinTupleComparator joincomparator = null;
-  private TupleComparator[] tupleComparator = null;
+  private BaseTupleComparator[] tupleComparator = null;
 
   private final static int INITIAL_TUPLE_SLOT = 10000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
new file mode 100644
index 0000000..2ac8662
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
@@ -0,0 +1,250 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/**
+ * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
+ */
+class PairWiseMerger implements Scanner {
+  private static final Log LOG = LogFactory.getLog(PairWiseMerger.class);
+
+  private Scanner leftScan;
+  private Scanner rightScan;
+
+  private VTuple outTuple;
+  private VTuple leftTuple;
+  private VTuple rightTuple;
+
+  private final Schema schema;
+  private final Comparator<Tuple> comparator;
+
+  private float mergerProgress;
+  private TableStats mergerInputStats;
+
+  private static enum State {
+    NEW,
+    INITED,
+    CLOSED
+  }
+
+  private State state = State.NEW;
+
+  public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, Comparator<Tuple> comparator)
+      throws IOException {
+    this.schema = schema;
+    this.leftScan = leftScanner;
+    this.rightScan = rightScanner;
+    this.comparator = comparator;
+  }
+
+  private void setState(State state) {
+    this.state = state;
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (state == State.NEW) {
+      leftScan.init();
+      rightScan.init();
+
+      prepareTuplesForFirstComparison();
+
+      mergerInputStats = new TableStats();
+      mergerProgress = 0.0f;
+
+      setState(State.INITED);
+    } else {
+      throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name());
+    }
+  }
+
+  private void prepareTuplesForFirstComparison() throws IOException {
+    Tuple lt = leftScan.next();
+    if (lt != null) {
+      leftTuple = new VTuple(lt);
+    } else {
+      leftTuple = null; // TODO - missed free
+    }
+
+    Tuple rt = rightScan.next();
+    if (rt != null) {
+      rightTuple = new VTuple(rt);
+    } else {
+      rightTuple = null; // TODO - missed free
+    }
+  }
+
+  public Tuple next() throws IOException {
+
+    if (leftTuple != null && rightTuple != null) {
+      if (comparator.compare(leftTuple, rightTuple) < 0) {
+        outTuple = new VTuple(leftTuple);
+
+        Tuple lt = leftScan.next();
+        if (lt != null) {
+          leftTuple = new VTuple(lt);
+        } else {
+          leftTuple = null; // TODO - missed free
+        }
+      } else {
+        outTuple = new VTuple(rightTuple);
+
+        Tuple rt = rightScan.next();
+        if (rt != null) {
+          rightTuple = new VTuple(rt);
+        } else {
+          rightTuple = null; // TODO - missed free
+        }
+      }
+      return outTuple;
+    }
+
+    if (leftTuple == null) {
+      if (rightTuple != null) {
+        outTuple = new VTuple(rightTuple);
+      } else {
+        outTuple = null;
+      }
+
+      Tuple rt = rightScan.next();
+      if (rt != null) {
+        rightTuple = new VTuple(rt);
+      } else {
+        rightTuple = null; // TODO - missed free
+      }
+    } else {
+      if (leftTuple != null) {
+        outTuple = new VTuple(leftTuple);
+      } else {
+        outTuple = null;
+      }
+
+      Tuple lt = leftScan.next();
+      if (lt != null) {
+        leftTuple = new VTuple(lt);
+      } else {
+        leftTuple = null; // TODO - missed free
+      }
+    }
+    return outTuple;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (state == State.INITED) {
+      leftScan.reset();
+      rightScan.reset();
+
+      outTuple = null;
+      leftTuple = null;
+      rightTuple = null;
+
+      prepareTuplesForFirstComparison();
+    } else {
+      throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name());
+    }
+  }
+
+  public void close() throws IOException {
+    IOUtils.cleanup(PairWiseMerger.LOG, leftScan, rightScan);
+    getInputStats();
+    leftScan = null;
+    rightScan = null;
+    mergerProgress = 1.0f;
+    setState(State.CLOSED);
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public float getProgress() {
+    if (leftScan == null) {
+      return mergerProgress;
+    }
+    return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (leftScan == null) {
+      return mergerInputStats;
+    }
+    TableStats leftInputStats = leftScan.getInputStats();
+    if (mergerInputStats == null) {
+      mergerInputStats = new TableStats();
+    }
+    mergerInputStats.setNumBytes(0);
+    mergerInputStats.setReadBytes(0);
+    mergerInputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+      mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+      mergerInputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = rightScan.getInputStats();
+    if (rightInputStats != null) {
+      mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+      mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+      mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+    }
+
+    return mergerInputStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 68379d1..786c726 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -47,7 +47,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   private Schema keySchema;
 
   private BSTIndex.BSTIndexWriter indexWriter;
-  private TupleComparator comp;
+  private BaseTupleComparator comp;
   private FileAppender appender;
   private TableMeta meta;
 
@@ -71,7 +71,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     }
 
     BSTIndex bst = new BSTIndex(new TajoConf());
-    this.comp = new TupleComparator(keySchema, sortSpecs);
+    this.comp = new BaseTupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);
     this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 5d4dad5..bc945de 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -26,9 +26,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -52,7 +52,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   private List<Tuple> innerTupleSlots;
 
   private JoinTupleComparator joinComparator = null;
-  private TupleComparator[] tupleComparator = null;
+  private BaseTupleComparator[] tupleComparator = null;
 
   private final static int INITIAL_TUPLE_SLOT = 10000;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 2f0c12f..122d4f3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -34,7 +34,7 @@ import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.engine.utils.TupleCacheKey;
 import org.apache.tajo.engine.utils.TupleUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index a4a8d37..e261e0c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -18,11 +18,12 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 
 import java.io.IOException;
 import java.util.Comparator;
@@ -35,7 +36,13 @@ public abstract class SortExec extends UnaryPhysicalExec {
                   Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) {
     super(context, inSchema, outSchema, child);
     this.sortSpecs = sortSpecs;
-    this.comparator = new TupleComparator(inSchema, sortSpecs);
+
+    BaseTupleComparator comp = new BaseTupleComparator(inSchema, sortSpecs);
+    if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+      this.comparator = context.getSharedResource().getCompiledComparator(inSchema, comp);
+    } else {
+      this.comparator = comp;
+    }
   }
 
   public SortSpec[] getSortSpecs() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index fd0c04f..3199b56 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -68,10 +68,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
     }
 
     PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta);
-
-    if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
-      maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB;
-    }
+    maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB;
 
     openNewFile(writtenFileNum);
     sumStats = new TableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 7aeed13..2945185 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -27,8 +27,8 @@ import org.apache.tajo.engine.eval.WindowFunctionEval;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.planner.logical.WindowAggNode;
 import org.apache.tajo.engine.planner.logical.WindowSpec;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -265,7 +265,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
   }
 
   private void evaluationWindowFrame() {
-    TupleComparator comp;
+    BaseTupleComparator comp;
 
     evaluatedTuples = new ArrayList<Tuple>();
 
@@ -285,9 +285,9 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
     for (int idx = 0; idx < functions.length; idx++) {
       if (orderedFuncFlags[idx]) {
-        comp = new TupleComparator(inSchema, functions[idx].getSortSpecs());
+        comp = new BaseTupleComparator(inSchema, functions[idx].getSortSpecs());
         Collections.sort(accumulatedInTuples, comp);
-        comp = new TupleComparator(schemaForOrderBy, functions[idx].getSortSpecs());
+        comp = new BaseTupleComparator(schemaForOrderBy, functions[idx].getSortSpecs());
         Collections.sort(evaluatedTuples, comp);
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index ec5df04..7e26a22 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -29,7 +29,7 @@ import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
deleted file mode 100644
index 981b572..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-public class SchemaUtil {
-  // See TAJO-914 bug.
-  //
-  // Its essential problem is that constant value is evaluated multiple times at each scan.
-  // As a result, join nodes can take the child nodes which have the same named fields.
-  // Because current schema does not allow the same name and ignore the duplicated schema,
-  // it finally causes the in-out schema mismatch between the parent and child nodes.
-  //
-  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
-  // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
-  static int tmpColumnSeq = 0;
-  public static Schema merge(Schema left, Schema right) {
-    Schema merged = new Schema();
-    for(Column col : left.getColumns()) {
-      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn(col);
-      }
-    }
-    for(Column col : right.getColumns()) {
-      if (merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
-      } else {
-        merged.addColumn(col);
-      }
-    }
-
-    // if overflow
-    if (tmpColumnSeq < 0) {
-      tmpColumnSeq = 0;
-    }
-    return merged;
-  }
-
-  /**
-   * Get common columns to be used as join keys of natural joins.
-   */
-  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
-    Schema common = new Schema();
-    for (Column outer : left.getColumns()) {
-      if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
-        common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
-      }
-    }
-    
-    return common;
-  }
-
-  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
-    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
-    if (tableName != null) {
-      logicalSchema.setQualifier(tableName);
-    }
-    return logicalSchema;
-  }
-
-  public static <T extends Schema> T clone(Schema schema) {
-    try {
-      T copy = (T) schema.clone();
-      return copy;
-    } catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index e77e265..4b02777 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -30,6 +30,8 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.Pair;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,10 +87,10 @@ public class ExecutionBlockSharedResource {
   }
 
   public EvalNode compileEval(Schema schema, EvalNode eval) {
-    return compilationContext.getCompiler().compile(schema, eval);
+    return compilationContext.getEvalCompiler().compile(schema, eval);
   }
 
-  public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) {
+  public EvalNode getCompiledEval(Schema schema, EvalNode eval) {
     if (codeGenEnabled) {
 
       Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval);
@@ -96,7 +98,7 @@ public class ExecutionBlockSharedResource {
         return compilationContext.getPrecompiedEvals().get(key);
       } else {
         try {
-          LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval);
+          LOG.warn(eval.toString() + " does not exist. Compiling it immediately.");
           return compileEval(schema, eval);
         } catch (Throwable t) {
           LOG.warn(t);
@@ -104,10 +106,37 @@ public class ExecutionBlockSharedResource {
         }
       }
     } else {
-      throw new IllegalStateException("CodeGen is disabled");
+      throw new IllegalStateException("CODEGEN is disabled");
     }
   }
 
+  public TupleComparator compileComparator(Schema schema, BaseTupleComparator comp) {
+    return compilationContext.getComparatorCompiler().compile(comp, false);
+  }
+
+  public TupleComparator getCompiledComparator(Schema schema, BaseTupleComparator comp) {
+    if (codeGenEnabled) {
+      Pair<Schema, BaseTupleComparator> key = new Pair<Schema, BaseTupleComparator>(schema, comp);
+      if (compilationContext.getPrecompiedComparators().containsKey(key)) {
+        return compilationContext.getPrecompiedComparators().get(key);
+      } else {
+        try {
+          LOG.warn(comp + " does not exist. Compiling it immediately");
+          return compileComparator(schema, comp);
+        } catch (Throwable t) {
+          LOG.warn(t);
+          return comp;
+        }
+      }
+    } else {
+      throw new IllegalStateException("CODEGEN is disabled");
+    }
+  }
+
+  public TajoClassLoader getClassLoader() {
+    return classLoader;
+  }
+
   public void release() {
     compilationContext = null;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index be33a12..2b58196 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.worker.dataserver.retriever.FileChunk;
 import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -57,10 +57,10 @@ public class RangeRetrieverHandler implements RetrieverHandler {
   private final File file;
   private final BSTIndex.BSTIndexReader idxReader;
   private final Schema schema;
-  private final TupleComparator comp;
+  private final BaseTupleComparator comp;
   private final RowStoreDecoder decoder;
 
-  public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
+  public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException {
     this.file = outDir;
     BSTIndex index = new BSTIndex(new TajoConf());
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 7b4cbe1..d2cb60d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -49,8 +49,9 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.RawFile;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
@@ -94,7 +95,9 @@ public class Task {
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
-  private TupleComparator sortComp = null;
+
+  private BaseTupleComparator sortComp = null;
+  private ClientSocketChannelFactory channelFactory = null;
 
   static final String OUTPUT_FILE_PREFIX="part-";
   static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -175,7 +178,7 @@ public class Task {
       if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
         SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
-        this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+        this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
       // The final result of a task will be written in a file named part-ss-nnnnnnn,
@@ -685,7 +688,7 @@ public class Task {
           if (!storeDir.exists()) {
             storeDir.mkdirs();
           }
-          storeFile = new File(storeDir, "in_" + i);
+          storeFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
           Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory);
           runnerList.add(fetcher);
           i++;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 422ec2b..4973ff8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -133,6 +133,12 @@ public class TaskAttemptContext {
                             final Fragment [] fragments,  final Path workDir) {
     this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
   }
+  @VisibleForTesting
+  public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId,
+                            final Fragment [] fragments,  final Path workDir, ExecutionBlockSharedResource resource) {
+    this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+    this.sharedResource = resource;
+  }
 
   public TajoConf getConf() {
     return queryContext.getConf();
@@ -167,13 +173,9 @@ public class TaskAttemptContext {
     return sharedResource;
   }
 
-  public EvalNode compileEval(Schema schema, EvalNode eval) {
-    return sharedResource.compileEval(schema, eval);
-  }
-
   public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) {
     if (sharedResource != null) {
-      return sharedResource.getPreCompiledEval(schema, eval);
+      return sharedResource.getCompiledEval(schema, eval);
     } else {
       LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests");
       return eval;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
new file mode 100644
index 0000000..6d9b135
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
@@ -0,0 +1,352 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.codegen;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.offheap.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type.*;
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
+import static org.junit.Assert.assertTrue;
+
+public class TestTupleComparerCompiler {
+  private static TajoClassLoader classLoader;
+  private static TupleComparerCompiler compiler;
+
+  @BeforeClass
+  public static void setUp() {
+    classLoader = new TajoClassLoader();
+    compiler = new TupleComparerCompiler(classLoader);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Throwable {
+    compiler = null;
+
+    classLoader.clean();
+    classLoader = null;
+  }
+
+  private SortSpec [][] createSortSpecs(String columnName) {
+    Column column = schema.getColumn(columnName);
+    SortSpec [][] sortSpecList = new SortSpec[4][];
+    sortSpecList[0] = new SortSpec[] {new SortSpec(column, true, false)};
+    sortSpecList[1] = new SortSpec[] {new SortSpec(column, true, true)};
+    sortSpecList[2] = new SortSpec[] {new SortSpec(column, false, false)};
+    sortSpecList[3] = new SortSpec[] {new SortSpec(column, false, true)};
+    return sortSpecList;
+  }
+
+  private TupleComparator [] createComparators(SortSpec [][] sortSpecs, boolean unsafeTuple) {
+    TupleComparator [] comps = new TupleComparator[sortSpecs.length];
+    for (int i = 0; i < sortSpecs.length; i++) {
+      BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs[i]);
+      comps[i] = compiler.compile(compImpl, unsafeTuple);
+    }
+
+    return comps;
+  }
+
+  private void assertCompareAll(TupleComparator [] comps, SortSpec [][] sortSpecs, Tuple...tuples) {
+    Preconditions.checkArgument(comps.length == sortSpecs.length);
+    Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5");
+
+    for (int i = 0; i < comps.length; i++) {
+      assertCompare(comps[i], sortSpecs[i], tuples);
+    }
+  }
+
+  /**
+   * First two tuples must be the same values for equality check.
+   * @param tuples
+   */
+  private void assertCompare(TupleComparator comp, SortSpec [] sortSpecs, Tuple...tuples) {
+    Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5");
+
+    if (sortSpecs[0].isAscending()) {
+      assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0);
+      assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) < 0);
+      assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) > 0);
+    } else {
+      assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0);
+      assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) > 0);
+      assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) < 0);
+    }
+
+    if (sortSpecs[0].isNullFirst()) {
+      assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) > 0);
+      assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) < 0);
+    } else {
+      assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) < 0);
+      assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) > 0);
+    }
+
+    assertTrue("Checking Null Equality", comp.compare(tuples[3], tuples[4]) == 0);
+    assertTrue("Checking Null Equality", comp.compare(tuples[4], tuples[3]) == 0);
+  }
+
+  @Test
+  public void testCompareOneBool() throws Exception {
+    SortSpec [][] sortSpecs = createSortSpecs("col0");
+    TupleComparator [] comps = createComparators(sortSpecs, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(0, DatumFactory.createBool(true));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(0, DatumFactory.createBool(true));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(0, DatumFactory.createBool(false));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(0, NullDatum.get());
+
+    assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+  }
+
+  @Test
+  public void testCompareOneInt() throws Exception {
+    SortSpec [][] sortSpecs = createSortSpecs("col2");
+    TupleComparator [] comps = createComparators(sortSpecs, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(2, DatumFactory.createInt2((short) 1));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(2, DatumFactory.createInt2((short) 1));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(2, DatumFactory.createInt2((short) 2));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(2, NullDatum.get());
+
+    Tuple t5 = new VTuple(schema.size());
+    t5.put(2, NullDatum.get());
+
+    assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t5);
+  }
+
+  @Test
+  public void testCompareTwoInts() throws Exception {
+    SortSpec[] sortSpecs = new SortSpec[] {
+        new SortSpec(new Column("col2", INT2)),
+        new SortSpec(new Column("col3", INT4))};
+
+
+    BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs);
+
+    TajoClassLoader classLoader = new TajoClassLoader();
+
+    TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader);
+    TupleComparator compiled = compiler.compile(comparator, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(2, DatumFactory.createInt2((short) 1));
+    t1.put(3, DatumFactory.createInt4(1));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(2, DatumFactory.createInt2((short) 1));
+    t2.put(3, DatumFactory.createInt4(1));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(2, DatumFactory.createInt2((short) 2));
+    t3.put(3, DatumFactory.createInt4(1));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(2, DatumFactory.createInt2((short) 1));
+    t4.put(3, DatumFactory.createInt4(2));
+
+    Tuple t5 = new VTuple(schema.size());
+    t5.put(2, NullDatum.get());
+    t5.put(3, DatumFactory.createInt4(2));
+
+    Tuple t6 = new VTuple(schema.size());
+    t6.put(2, DatumFactory.createInt2((short) 1));
+    t6.put(3, NullDatum.get());
+
+    assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5);
+    assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5);
+    assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6);
+  }
+
+  @Test
+  public void testCompareOneFloat4() throws Exception {
+    SortSpec [][] sortSpecs = createSortSpecs("col4");
+    TupleComparator comps [] = createComparators(sortSpecs, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(4, DatumFactory.createFloat4(1.0f));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(4, DatumFactory.createFloat4(1.0f));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(4, DatumFactory.createFloat4(2.0f));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(4, NullDatum.get());
+
+    assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+  }
+
+  @Test
+  public void testCompareFloat4Float8() throws Exception {
+    SortSpec[] sortSpecs = new SortSpec[] {
+        new SortSpec(new Column("col4", FLOAT4)),
+        new SortSpec(new Column("col5", FLOAT8))};
+
+    BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs);
+
+    TajoClassLoader classLoader = new TajoClassLoader();
+    TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader);
+    TupleComparator compiled = compiler.compile(comparator, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(4, DatumFactory.createFloat4(1.0f));
+    t1.put(5, DatumFactory.createFloat8(1.0f));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(4, DatumFactory.createFloat4(1.0f));
+    t2.put(5, DatumFactory.createFloat8(1.0f));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(4, DatumFactory.createFloat4(1.0f));
+    t3.put(5, DatumFactory.createFloat8(2.0f));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(4, DatumFactory.createFloat4(2.0f));
+    t4.put(5, DatumFactory.createFloat8(1.0f));
+
+    Tuple t5 = new VTuple(schema.size());
+    t5.put(4, DatumFactory.createFloat4(2.0f));
+    t5.put(5, NullDatum.get());
+
+    Tuple t6 = new VTuple(schema.size());
+    t6.put(4, NullDatum.get());
+    t6.put(5, DatumFactory.createFloat8(1.0f));
+
+    assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5);
+    assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5);
+    assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6);
+  }
+
+  @Test
+  public void testCompareText() throws Exception {
+    SortSpec [][] sortSpecs = createSortSpecs("col6");
+    TupleComparator [] comps = createComparators(sortSpecs, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(6, DatumFactory.createText("tajo"));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(6, DatumFactory.createText("tajo"));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(6, DatumFactory.createText("tazo"));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(6, NullDatum.get());
+
+    assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+  }
+
+  @Test
+  public void testCompareTextWithNull() throws Exception {
+    SortSpec[] sortSpecs = new SortSpec[] {
+        new SortSpec(new Column("col5", FLOAT8)),
+        new SortSpec(new Column("col6", TEXT))};
+    BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs);
+    TupleComparator comp = compiler.compile(compImpl, false);
+
+    Tuple t1 = new VTuple(schema.size());
+    t1.put(5, NullDatum.get());
+    t1.put(6, DatumFactory.createText("ARGENTINA"));
+
+    Tuple t2 = new VTuple(schema.size());
+    t2.put(5, NullDatum.get());
+    t2.put(6, DatumFactory.createText("ARGENTINA"));
+
+    Tuple t3 = new VTuple(schema.size());
+    t3.put(5, NullDatum.get());
+    t3.put(6, DatumFactory.createText("CANADA"));
+
+    Tuple t4 = new VTuple(schema.size());
+    t4.put(5, NullDatum.get());
+    t4.put(6, NullDatum.get());
+
+    assertCompare(comp, sortSpecs, t1, t2, t3, t4, t4);
+  }
+
+  private void fillTextColumnToRowBlock(OffHeapRowBlock rowBlock, String text) {
+    RowWriter writer = rowBlock.getWriter();
+    writer.startRow();
+    writer.skipField(); // 0
+    writer.skipField(); // 1
+    writer.skipField(); // 2
+    writer.skipField(); // 3
+    writer.skipField(); // 4
+    writer.skipField(); // 5
+    if (text != null) {
+      writer.putText(text);
+    }
+    writer.endRow();
+  }
+
+  @Test
+  public void testCompareTextInUnSafeTuple() throws Exception {
+    SortSpec [][] sortSpecs = createSortSpecs("col6");
+    TupleComparator [] comps = createComparators(sortSpecs, true);
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 640);
+    fillTextColumnToRowBlock(rowBlock, "tajo");
+    fillTextColumnToRowBlock(rowBlock, "tajo");
+    fillTextColumnToRowBlock(rowBlock, "tazo");
+    fillTextColumnToRowBlock(rowBlock, null);
+
+    List<UnSafeTuple> tuples = Lists.newArrayList();
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    reader.reset();
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    while(reader.next(zcTuple)) {
+      tuples.add(zcTuple);
+      zcTuple = new ZeroCopyTuple();
+    }
+
+    assertCompareAll(comps, sortSpecs, tuples.get(0), tuples.get(1), tuples.get(2), tuples.get(3), tuples.get(3));
+    rowBlock.release();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 4cbdddd..9964a38 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -40,7 +40,7 @@ import org.apache.tajo.engine.plan.EvalTreeProtoSerializer;
 import org.apache.tajo.engine.plan.proto.PlanProto;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.LazyTuple;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index b370be7..7116905 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -31,8 +31,8 @@ import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.function.builtin.SumInt;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -287,7 +287,7 @@ public class TestPlannerUtil {
     FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
 
     EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
-    TupleComparator [] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
+    BaseTupleComparator[] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
 
     Tuple t1 = new VTuple(2);
     t1.put(0, DatumFactory.createInt4(1));
@@ -297,11 +297,11 @@ public class TestPlannerUtil {
     t2.put(0, DatumFactory.createInt4(2));
     t2.put(1, DatumFactory.createInt4(3));
 
-    TupleComparator outerComparator = comparators[0];
+    BaseTupleComparator outerComparator = comparators[0];
     assertTrue(outerComparator.compare(t1, t2) < 0);
     assertTrue(outerComparator.compare(t2, t1) > 0);
 
-    TupleComparator innerComparator = comparators[1];
+    BaseTupleComparator innerComparator = comparators[1];
     assertTrue(innerComparator.compare(t1, t2) < 0);
     assertTrue(innerComparator.compare(t2, t1) > 0);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index 2294424..12f8892 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.junit.Test;
@@ -483,7 +483,7 @@ public class TestUniformRangePartition {
     TupleRange expected = new TupleRange(sortSpecs, s, e);
 
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
-    TupleComparator comp = new TupleComparator(schema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs);
 
     Tuple tuple = s;
     Tuple prevTuple = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index f817776..5b4c696 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -68,7 +68,7 @@ public class TestBSTIndexExec {
   private LogicalOptimizer optimizer;
   private AbstractStorageManager sm;
   private Schema idxSchema;
-  private TupleComparator comp;
+  private BaseTupleComparator comp;
   private BSTIndex.BSTIndexWriter writer;
   private HashMap<Integer , Integer> randomValues ;
   private int rndKey = -1;
@@ -104,7 +104,7 @@ public class TestBSTIndexExec {
     idxSchema.addColumn("managerid", Type.INT4);
     SortSpec[] sortKeys = new SortSpec[1];
     sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
-    this.comp = new TupleComparator(idxSchema, sortKeys);
+    this.comp = new BaseTupleComparator(idxSchema, sortKeys);
 
     this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
         BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index e7aac3c..eb02bfc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -18,8 +18,13 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
@@ -27,29 +32,38 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockUtils;
+import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.raw.TestDirectRawFile;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Random;
+import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestExternalSortExec {
+  private static final Log LOG = LogFactory.getLog(TestExternalSortExec.class);
+
   private TajoConf conf;
   private TajoTestingCluster util;
   private final String TEST_PATH = "target/test-data/TestExternalSortExec";
@@ -60,8 +74,6 @@ public class TestExternalSortExec {
   private Path testDir;
 
   private final int numTuple = 100000;
-  private Random rnd = new Random(System.currentTimeMillis());
-
   private TableDesc employee;
 
   @Before
@@ -75,30 +87,22 @@ public class TestExternalSortExec {
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
     sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
-    Schema schema = new Schema();
-    schema.addColumn("managerid", Type.INT4);
-    schema.addColumn("empid", Type.INT4);
-    schema.addColumn("deptname", Type.TEXT);
-
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
-    Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
-    appender.enableStats();
-    appender.init();
-    Tuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < numTuple; i++) {
-      tuple.put(new Datum[] {
-          DatumFactory.createInt4(rnd.nextInt(50)),
-          DatumFactory.createInt4(rnd.nextInt(100)),
-          DatumFactory.createText("dept_" + i),
-      });
-      appender.addTuple(tuple);
-    }
-    appender.flush();
-    appender.close();
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(numTuple);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.DIRECTRAW);
+
+    Path outFile = new Path(TEST_PATH, "output.draw");
+
+    long startTime = System.currentTimeMillis();
+    Path employeePath = TestDirectRawFile.writeRowBlock(conf, employeeMeta, rowBlock, outFile);
+    long endTime = System.currentTimeMillis();
+    rowBlock.release();
 
-    System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) +
-        " MB)");
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(employeePath);
+    LOG.info("============================================================");
+    LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false) + " " +
+        (endTime - startTime) + " msec");
+    LOG.info("============================================================");
 
     employee = new TableDesc("default.employee", schema, employeeMeta, employeePath);
     catalog.createTable(employee);
@@ -113,35 +117,31 @@ public class TestExternalSortExec {
   }
 
   String[] QUERIES = {
-      "select managerId, empId from employee order by managerId, empId"
+      "select col2, col3, col4 from employee order by col2, col3"
   };
 
   @Test
-  public final void testNext() throws IOException, PlanningException {
+  public final void testNext() throws IOException, PlanningException, InterruptedException {
+    QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+    queryContext.setBool(SessionVars.CODEGEN, true);
+    queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, 100);
+
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(queryContext, expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+    resource.initialize(queryContext, rootNode.toJson());
+
     FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
-    LogicalNode rootNode = plan.getRootBlock().getRoot();
+    TaskAttemptContext taskContext = new TaskAttemptContext(queryContext,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir, resource);
+    taskContext.setEnforcer(new Enforcer());
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-    
-    ProjectionExec proj = (ProjectionExec) exec;
-
-    // TODO - should be planed with user's optimization hint
-    if (!(proj.getChild() instanceof ExternalSortExec)) {
-      UnaryPhysicalExec sortExec = proj.getChild();
-      SeqScanExec scan = sortExec.getChild();
-
-      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
-          ((MemSortExec)sortExec).getPlan(), scan);
-      proj.setChild(extSort);
-    }
+    PhysicalExec exec = phyPlanner.createPlan(taskContext, rootNode);
 
     Tuple tuple;
     Tuple preVal = null;
@@ -149,10 +149,10 @@ public class TestExternalSortExec {
     int cnt = 0;
     exec.init();
     long start = System.currentTimeMillis();
-    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+    BaseTupleComparator comparator = new BaseTupleComparator(exec.getSchema(),
         new SortSpec[]{
-            new SortSpec(new Column("managerid", Type.INT4)),
-            new SortSpec(new Column("empid", Type.INT4))
+            new SortSpec(new Column("col2", Type.INT4)),
+            new SortSpec(new Column("col3", Type.INT8))
         });
 
     while ((tuple = exec.next()) != null) {
@@ -180,6 +180,26 @@ public class TestExternalSortExec {
     }
     assertEquals(numTuple, cnt);
     exec.close();
-    System.out.println("Sort Time: " + (end - start) + " msc");
+    System.out.println("Sort and final write time: " + (end - start) + " msc");
+  }
+
+  public static DirectRawFileScanner createSortedScanner(TajoConf conf, TableMeta meta, int rowNum,
+                                                         BaseTupleComparator comparator)
+      throws IOException {
+    Path testDir = CommonTestingUtil.getTestDir();
+    Path outFile = new Path(testDir, "file1.out");
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+
+    List<Tuple> tupleList = OffHeapRowBlockUtils.sort(rowBlock, comparator);
+
+    DirectRawFileWriter writer1 = new DirectRawFileWriter(conf, schema, meta, outFile);
+    writer1.init();
+    for (Tuple t:tupleList) {
+      writer1.addTuple(t);
+    }
+    writer1.close();
+
+    return new DirectRawFileScanner(conf, schema, meta, outFile);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
new file mode 100644
index 0000000..4f87513
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
@@ -0,0 +1,316 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestPairWiseMerger {
+  static TajoConf conf;
+  private static BaseTupleComparator comparator;
+  static {
+    conf = new TajoConf();
+
+    comparator = new BaseTupleComparator(TestOffHeapRowBlock.schema,
+        new SortSpec[] {
+            new SortSpec(new Column("col2", TajoDataTypes.Type.INT4)),
+            new SortSpec(new Column("col3", TajoDataTypes.Type.INT8))
+        });
+  }
+
+  @Test
+  public void testPairWiseMergerWithTwoLeaves() throws IOException {
+    int [] rowNums = new int [] {500, 1000};
+
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves1() throws IOException {
+    int [] rowNums = new int[] {1, 1, 1};
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves2() throws IOException {
+    int [] rowNums = new int[] {0, 0, 1};
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves3() throws IOException {
+    int [] rowNums = new int[] {0, 1, 0};
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves4() throws IOException {
+    int [] rowNums = new int[] {1, 0, 0};
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves5() throws IOException {
+    int [] rowNums = new int[] {1, 0, 1};
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves6() throws IOException {
+    int [] rowNums = new int[] {1, 1, 0};
+
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testPairWiseMergerWithThreeLeaves7() throws IOException {
+    int [] rowNums = new int[] {1, 0, 0};
+
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  @Test
+  public void testThreeLevelPairWiseMerger1() throws IOException {
+    int [] rowNums = new int[] {500, 501, 499, 498, 489, 450, 431, 429};
+
+    Scanner [] scanners = createScanners(rowNums);
+
+    PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+
+    merger = createRightDeepMerger(scanners);
+
+    merger.init();
+    assertSortResult(rowNums, merger, comparator);
+    merger.reset();
+    assertSortResult(rowNums, merger, comparator);
+    merger.close();
+  }
+
+  private static PairWiseMerger createLeftDeepMerger(Scanner [] scanners) throws IOException {
+    PairWiseMerger prev = null;
+    for (int i = 1; i < scanners.length; i++) {
+
+      if (i == 1) {
+        prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one
+      } else {
+        prev = new PairWiseMerger(schema, prev, scanners[i], comparator);
+      }
+    }
+
+    return prev;
+  }
+
+  private static PairWiseMerger createRightDeepMerger(Scanner [] scanners) throws IOException {
+    PairWiseMerger prev = null;
+    for (int i = 1; i < scanners.length; i++) {
+
+      if (i == 1) {
+        prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one
+      } else {
+        prev = new PairWiseMerger(schema, scanners[i], prev, comparator);
+      }
+    }
+
+    return prev;
+  }
+
+  private static  Scanner [] createScanners(int [] rowNums) throws IOException {
+    DirectRawFileScanner[] scanners = new DirectRawFileScanner[rowNums.length];
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    for (int i = 0; i < rowNums.length; i++) {
+      scanners[i] = TestExternalSortExec.createSortedScanner(conf, meta, rowNums[i], comparator);
+    }
+
+    assertEquals(rowNums.length, scanners.length);
+    return scanners;
+  }
+
+  private static void assertSortResult(int[] rowNums, Scanner scanner, BaseTupleComparator comparator) throws IOException {
+    Tuple tuple;
+    Tuple curVal;
+    Tuple preVal = null;
+    int idx = 0;
+    while ((tuple = scanner.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue(idx + "th, prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      idx++;
+    }
+
+    int totalRowNum = 0;
+    for (int i = 0; i < rowNums.length; i++) {
+      totalRowNum += rowNums[i];
+    }
+    assertEquals(totalRowNum, idx);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 5d809f8..0987a78 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -1071,7 +1071,7 @@ public class TestPhysicalPlanner {
     keySchema.addColumn("?empId", Type.INT4);
     SortSpec[] sortSpec = new SortSpec[1];
     sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpec);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
         keySchema, comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index f649dac..8f130de 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -87,7 +87,7 @@ public class TestProgressExternalSortExec {
     schema.addColumn("empid", TajoDataTypes.Type.INT4);
     schema.addColumn("deptname", TajoDataTypes.Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
     Path employeePath = new Path(testDir, "employee.csv");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
@@ -170,7 +170,7 @@ public class TestProgressExternalSortExec {
     Tuple curVal;
     int cnt = 0;
     exec.init();
-    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+    BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
         new SortSpec[]{
             new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)),
             new SortSpec(new Column("empid", TajoDataTypes.Type.INT4))

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 4d4cc3d..5610a60 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -83,7 +83,7 @@ public class TestSortExec {
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 100000; i++) {
       tuple.put(new Datum[] {
           DatumFactory.createInt4(rnd.nextInt(5)),
           DatumFactory.createInt4(rnd.nextInt(10)),

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index cecb281..78cce64 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -112,7 +112,7 @@ public class TestTupleUtil {
         sortSpecs);
     TupleRange [] ranges = partitioner.partition(5);
     assertTrue(5 <= ranges.length);
-    TupleComparator comp = new TupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
+    BaseTupleComparator comp = new BaseTupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
     TupleRange prev = ranges[0];
     for (int i = 1; i < ranges.length; i++) {
       assertTrue(comp.compare(prev.getStart(), ranges[i].getStart()) < 0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 5f8efe7..ccd2cde 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -174,7 +174,7 @@ public class TestRangeRetrieverHandler {
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);
@@ -298,7 +298,7 @@ public class TestRangeRetrieverHandler {
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index a0cbe9f..5a14795 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -132,6 +132,7 @@
                       run cp -r $ROOT/tajo-catalog/target/tajo-catalog-${project.version}/* .
                       run cp -r $ROOT/tajo-storage/target/tajo-storage-${project.version}/* .
                       run cp -r $ROOT/tajo-yarn-pullserver/target/tajo-yarn-pullserver-${project.version}.jar .
+                      run cp -r $ROOT/tajo-thirdparty/asm/target/tajo-thirdparty-asm-${project.version}.jar .
                       run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar .
                       run cp -r $ROOT/tajo-core/target/lib .
                       run cp -r ${project.basedir}/src/main/bin .

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index 5dae67e..6c8ef5d 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -17,6 +17,7 @@ package org.apache.tajo.jdbc; /**
  */
 
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
@@ -47,7 +48,12 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public boolean isNull(int fieldid) {
-    return values.get(fieldid) == null || values.get(fieldid) instanceof NullDatum;
+    return values.get(fieldid) == null || values.get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -142,7 +148,12 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public ProtobufDatum getProtobufDatum(int fieldId) {
-    throw new UnsupportedException();
+    throw new UnsupportedException("getProtobufDatum");
+  }
+
+  @Override
+  public IntervalDatum getInterval(int fieldId) {
+    throw new UnsupportedException("getInterval");
   }
 
   @Override
@@ -157,6 +168,6 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public Datum[] getValues(){
-    throw new UnsupportedException();
+    throw new UnsupportedException("getValues");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index fe5366e..e65d844 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -736,6 +736,13 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-storage</artifactId>
+        <type>test-jar</type>
+        <scope>test</scope>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
         <artifactId>tajo-yarn-pullserver</artifactId>
         <version>${tajo.version}</version>
       </dependency>