You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:18 UTC

[21/50] [abbrv] TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index ed529a9..e7eaf40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -19,26 +19,34 @@
 package org.apache.tajo.engine.utils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -401,4 +409,102 @@ public class TupleUtil {
     }
     return aTuple;
   }
+
+  @SuppressWarnings("unused")
+  public static Collection<Tuple> filterTuple(Schema schema, Collection<Tuple> tupleBlock, EvalNode filterCondition) {
+    TupleBlockFilterScanner filter = new TupleBlockFilterScanner(schema, tupleBlock, filterCondition);
+    return filter.nextBlock();
+  }
+
+  private static class TupleBlockFilterScanner {
+    private EvalNode qual;
+    private EvalContext qualCtx;
+    private Iterator<Tuple> iterator;
+    private Schema schema;
+
+    public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
+      this.schema = schema;
+      this.qual = qual;
+      this.qualCtx = qual.newContext();
+      this.iterator = tuples.iterator();
+    }
+
+    public List<Tuple> nextBlock() {
+      List<Tuple> results = Lists.newArrayList();
+
+      Tuple tuple;
+      while (iterator.hasNext()) {
+        tuple = iterator.next();
+        qual.eval(qualCtx, schema, tuple);
+        if (qual.terminate(qualCtx).asBool()) {
+          results.add(tuple);
+        }
+      }
+      return results;
+    }
+  }
+
+  /**
+   * Take a look at a column partition path. A partition path consists
+   * of a table path part and column values part. This method transforms
+   * a partition path into a tuple with a given partition column schema.
+   *
+   * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+   *                   ^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^^^^^
+   *                      table path part        column values part
+   *
+   * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+   * If it is true, it returns NULL when a given path is a file.
+   * Otherwise, it returns a built tuple regardless of file or directory.
+   *
+   * @param partitionColumnSchema The partition column schema
+   * @param partitionPath The partition path
+   * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+   * @return The tuple transformed from a column values part.
+   */
+  public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+                                                  boolean beNullIfFile) {
+    int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+    if (startIdx == -1) { // if there is no partition column in the patch
+      return null;
+    }
+    String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+    String [] columnValues = columnValuesPart.split("/");
+
+    // true means this is a file.
+    if (beNullIfFile && partitionColumnSchema.getColumnNum() < columnValues.length) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(partitionColumnSchema.getColumnNum());
+    int i = 0;
+    for (; i < columnValues.length && i < partitionColumnSchema.getColumnNum(); i++) {
+      String [] parts = columnValues[i].split("=");
+      if (parts.length != 2) {
+        return null;
+      }
+      int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+      Column keyColumn = partitionColumnSchema.getColumn(columnId);
+      tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), parts[1]));
+    }
+    for (; i < partitionColumnSchema.getColumnNum(); i++) {
+      tuple.put(i, NullDatum.get());
+    }
+    return tuple;
+  }
+
+  /**
+   * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+   * Then, you will get a string 'col1='.
+   *
+   * @param partitionColumn the schema of column partition
+   * @return The first part string of column partition path.
+   */
+  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(partitionColumn.getColumn(0).getColumnName()).append("=");
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d432017..b1deb43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
@@ -470,7 +472,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               task.getLogicalPlan().toJson(),
               context.getQueryContext(),
               subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
 
@@ -490,6 +492,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
     }
 
+    private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+      if (masterPlan.isRoot(block)) {
+        return false;
+      }
+
+      ExecutionBlock parent = masterPlan.getParent(block);
+      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+        return false;
+      }
+
+      return true;
+    }
+
     public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
 
@@ -519,7 +534,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               context.getQueryContext(),
               subQuery.getDataChannel(),
               subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
           for (ScanNode scan : task.getScanNodes()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 10f42c5..4575bf8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,7 +32,7 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
@@ -86,7 +86,7 @@ public class GlobalEngine extends AbstractService {
       analyzer = new SQLAnalyzer();
       converter = new HiveConverter();
       planner = new LogicalPlanner(context.getCatalog());
-      optimizer = new LogicalOptimizer();
+      optimizer = new LogicalOptimizer(context.getConf());
       verifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
 
       hookManager = new DistributedQueryHookManager();
@@ -234,7 +234,7 @@ public class GlobalEngine extends AbstractService {
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
       for (String error : state.getErrorMessages()) {
-        sb.append("ERROR: ").append(error).append("\n");
+        sb.append(error).append("\n");
       }
       throw new VerifyException(sb.toString());
     }
@@ -263,7 +263,7 @@ public class GlobalEngine extends AbstractService {
   }
 
   public TableDesc createTableOnPath(String tableName, Schema schema, TableMeta meta,
-                                     Path path, boolean isCreated, Partitions partitions)
+                                     Path path, boolean isCreated, PartitionDesc partitionDesc)
       throws IOException {
     if (catalog.existsTable(tableName)) {
       throw new AlreadyExistsTableException(tableName);
@@ -291,7 +291,9 @@ public class GlobalEngine extends AbstractService {
     stats.setNumBytes(totalSize);
     TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
     desc.setStats(stats);
-    desc.setPartitions(partitions);
+    if (partitionDesc != null) {
+      desc.setPartitions(partitionDesc);
+    }
     catalog.addTable(desc);
 
     LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 863ed18..861147a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -35,8 +35,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
@@ -316,12 +315,15 @@ public class TajoMasterClientService extends AbstractService {
 
         Schema schema = new Schema(request.getSchema());
         TableMeta meta = new TableMeta(request.getMeta());
-        Partitions partitions = new Partitions(request.getPartitions());
+        PartitionDesc partitionDesc = null;
+        if (request.hasPartitions()) {
+          partitionDesc = new PartitionDesc(request.getPartitions());
+        }
 
         TableDesc desc;
         try {
           desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
-              meta, path, false, partitions);
+              meta, path, false, partitionDesc);
         } catch (Exception e) {
           return TableResponse.newBuilder()
               .setResultCode(ResultCode.ERROR)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 828ebfa..5d717ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -266,7 +266,7 @@ public class QueryMasterTask extends CompositeService {
 
     CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer();
+    LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
     Expr expr;
     if (queryContext.isHiveQueryMode()) {
       HiveConverter hiveConverter = new HiveConverter();
@@ -297,6 +297,14 @@ public class QueryMasterTask extends CompositeService {
             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
           }
         }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+        if(scanNodes != null) {
+          for(LogicalNode eachScanNode: scanNodes) {
+            ScanNode scanNode = (ScanNode)eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
       }
 
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7fedd4f..03659c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,10 +43,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.AbstractTaskScheduler;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
@@ -745,21 +742,42 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       subQuery.eventHandler.handle(event);
     }
 
+    /**
+     * It creates a number of fragments for all partitions.
+     */
+    private static Collection<FileFragment> getFragmentsFromPartitionedTable(SubQuery subQuery,
+                                                                      ScanNode scan,
+                                                                      TableDesc table) throws IOException {
+      List<FileFragment> fragments = Lists.newArrayList();
+      PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+      for (Path path : partitionsScan.getInputPaths()) {
+        fragments.addAll(subQuery.getStorageManager().getSplits(
+            scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
+      }
+      partitionsScan.setInputPaths(null);
+      return fragments;
+    }
+
     private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
       ExecutionBlock execBlock = subQuery.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
       Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
-      TableMeta meta;
-      Path inputPath;
-
       ScanNode scan = scans[0];
-      TableDesc desc = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
-      inputPath = desc.getPath();
-      meta = desc.getMeta();
+      TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+
+      Collection<FileFragment> fragments;
+      TableMeta meta = table.getMeta();
 
-      // TODO - should be change the inner directory
-      List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
-          inputPath);
+      // Depending on scanner node's type, it creates fragments. If scan is for
+      // a partitioned table, It will creates lots fragments for all partitions.
+      // Otherwise, it creates at least one fragments for a table, which may
+      // span a number of blocks or possibly consists of a number of files.
+      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+        fragments = getFragmentsFromPartitionedTable(subQuery, scan, table);
+      } else {
+        Path inputPath = table.getPath();
+        fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+      }
 
       QueryUnit queryUnit;
       List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index d95c352..0637023 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -124,7 +124,7 @@ message CreateTableRequest {
   required SchemaProto schema = 2;
   required TableProto meta = 3;
   required string path = 4;
-  optional PartitionsProto partitions = 5;
+  optional PartitionDescProto partitions = 5;
 }
 
 message DropTableRequest {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 8e257f2..44004d2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -380,8 +380,8 @@ public class TestTajoClient {
 
     assertFalse(client.existTable(tableName));
 
-    String sql = "create table " + tableName + " (deptname text, score int4)";
-    sql += "PARTITION BY COLUMN (deptname)";
+    String sql = "create table " + tableName + " (score int4)";
+    sql += "PARTITION BY COLUMN (deptname text)";
 
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index c35c786..ca1259b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -97,7 +97,7 @@ public class ExprTestBase {
     Schema inputSchema = null;
     if (schema != null) {
       inputSchema = (Schema) schema.clone();
-      inputSchema.setQualifier(tableName, true);
+      inputSchema.setQualifier(tableName);
 
       int targetIdx [] = new int[inputSchema.getColumnNum()];
       for (int i = 0; i < targetIdx.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 1850081..3c67068 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -169,6 +169,7 @@ public class TestEvalTreeUtil {
     "select score from people where 10 * 2 > score * 10", // 4
     "select score from people where score < 10 and 4 < score", // 5
     "select score from people where score < 10 and 4 < score and age > 5", // 6
+    "select score from people where (score > 1 and score < 3) or (7 < score and score < 10)", // 7
   };
   
   @Test
@@ -208,7 +209,7 @@ public class TestEvalTreeUtil {
   public final void testGetCNF() {
     // "select score from people where score < 10 and 4 < score "
     EvalNode node = getRootSelection(QUERIES[5]);
-    EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(node);
+    EvalNode [] cnf = AlgebraicUtil.toConjunctiveNormalFormArray(node);
     
     Column col1 = new Column("people.score", TajoDataTypes.Type.INT4);
     
@@ -235,26 +236,37 @@ public class TestEvalTreeUtil {
   public final void testTransformCNF2Singleton() {
     // "select score from people where score < 10 and 4 < score "
     EvalNode node = getRootSelection(QUERIES[6]);
-    EvalNode [] cnf1 = EvalTreeUtil.getConjNormalForm(node);
+    EvalNode [] cnf1 = AlgebraicUtil.toConjunctiveNormalFormArray(node);
     assertEquals(3, cnf1.length);
     
-    EvalNode conj = EvalTreeUtil.transformCNF2Singleton(cnf1);
-    EvalNode [] cnf2 = EvalTreeUtil.getConjNormalForm(conj);
+    EvalNode conj = AlgebraicUtil.createSingletonExprFromCNF(cnf1);
+    EvalNode [] cnf2 = AlgebraicUtil.toConjunctiveNormalFormArray(conj);
     
     Set<EvalNode> set1 = Sets.newHashSet(cnf1);
     Set<EvalNode> set2 = Sets.newHashSet(cnf2);
     assertEquals(set1, set2);
   }
+
+  @Test
+  public final void testGetDNF() {
+    // "select score from people where score > 1 and score < 3 or score > 7 and score < 10", // 7
+    EvalNode node = getRootSelection(QUERIES[7]);
+    EvalNode [] cnf = AlgebraicUtil.toDisjunctiveNormalFormArray(node);
+    assertEquals(2, cnf.length);
+
+    assertEquals("people.score (INT4(0)) > 1 AND people.score (INT4(0)) < 3", cnf[0].toString());
+    assertEquals("7 < people.score (INT4(0)) AND people.score (INT4(0)) < 10", cnf[1].toString());
+  }
   
   @Test
   public final void testSimplify() throws PlanningException {
     Target [] targets = getRawTargets(QUERIES[0]);
-    EvalNode node = AlgebraicUtil.simplify(targets[0].getEvalTree());
+    EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
     EvalContext nodeCtx = node.newContext();
     assertEquals(EvalType.CONST, node.getType());
     node.eval(nodeCtx, null, null);
     assertEquals(7, node.terminate(nodeCtx).asInt4());
-    node = AlgebraicUtil.simplify(targets[1].getEvalTree());
+    node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
     assertEquals(EvalType.CONST, node.getType());
     nodeCtx = node.newContext();
     node.eval(nodeCtx, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 6775aa9..389a9e5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -24,7 +24,6 @@ import org.apache.tajo.algebra.CreateTable;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.LiteralValue;
 import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.engine.parser.SQLParser.Boolean_value_expressionContext;
 import org.apache.tajo.engine.parser.SQLParser.SqlContext;
 import org.apache.tajo.util.FileUtil;
 import org.junit.Test;
@@ -321,9 +320,9 @@ public class TestSQLAnalyzer {
     assertEquals(CreateTable.PartitionType.COLUMN, createTable.getPartition().getPartitionType());
     CreateTable.ColumnPartition columnPartition = createTable.getPartition();
     assertEquals(3, columnPartition.getColumns().length);
-    assertEquals("col1", columnPartition.getColumns()[0].getCanonicalName());
-    assertEquals("col2", columnPartition.getColumns()[1].getCanonicalName());
-    assertEquals("col3", columnPartition.getColumns()[2].getCanonicalName());
+    assertEquals("col3", columnPartition.getColumns()[0].getColumnName());
+    assertEquals("col4", columnPartition.getColumns()[1].getColumnName());
+    assertEquals("col5", columnPartition.getColumns()[2].getColumnName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 50bc8dc..92faec0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -83,7 +83,7 @@ public class TestLogicalOptimizer {
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 025c84b..1b38acd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -74,7 +74,7 @@ public class TestLogicalPlan {
       catalog.addTable(d);
     }
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
   }
 
   public static void tearDown() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index c9499d6..a8f4631 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -143,7 +143,7 @@ public class TestBSTIndexExec {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 57f2376..c43046f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -122,7 +122,7 @@ public class TestHashAntiJoinExec {
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index d03f3c2..93f9c33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -126,7 +126,7 @@ public class TestHashSemiJoinExec {
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b57ef3a..b4c66a1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -150,7 +150,7 @@ public class TestPhysicalPlanner {
     catalog.addTable(score);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 45badd5..07f726d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -96,7 +96,7 @@ public class TestSortExec {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 629ddb5..b770de5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -240,205 +240,4 @@ public class TestInsertQuery {
       assertTrue(codec instanceof DeflateCodec);
     }
   }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn1() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn1";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn2() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn2";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn3() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn3";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2, col3) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-  }
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression1() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression1";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-
-    for (FileStatus partition : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition.getPath()));
-      for (FileStatus file : fs.listStatus(partition.getPath())) {
-        CompressionCodec codec = factory.getCodec(file.getPath());
-        assertTrue(codec instanceof DeflateCodec);
-      }
-    }
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression2() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression2";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-
-    for (FileStatus partition1 : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition1.getPath()));
-      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
-        assertTrue(fs.isDirectory(partition2.getPath()));
-        for (FileStatus file : fs.listStatus(partition2.getPath())) {
-          CompressionCodec codec = factory.getCodec(file.getPath());
-          assertTrue(codec instanceof DeflateCodec);
-        }
-      }
-    }
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression3() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression3";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2, col3)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
-    for (FileStatus partition1 : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition1.getPath()));
-      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
-        assertTrue(fs.isDirectory(partition2.getPath()));
-        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
-          assertTrue(fs.isDirectory(partition3.getPath()));
-          for (FileStatus file : fs.listStatus(partition3.getPath())) {
-            CompressionCodec codec = factory.getCodec(file.getPath());
-            assertTrue(codec instanceof DeflateCodec);
-          }
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
new file mode 100644
index 0000000..4fb5b8a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestTablePartitions {
+
+  private static TpchTestBase tpch;
+  public TestTablePartitions() throws IOException {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumn() throws Exception {
+    String tableName ="testColumnPartitionedTableByOneColumn";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute("insert overwrite into " + tableName
+        + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(tableName);
+    Path path = desc.getPath();
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    res = tpch.execute(
+        "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumns() throws Exception {
+    String tableName ="testColumnPartitionedTableByThreeColumns";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute("insert overwrite into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(tableName);
+    Path path = desc.getPath();
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    assertFalse(res.next());
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "PARTITION BY column(col1 int4)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName + " select  l_partkey, l_quantity, l_orderkey from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+
+    for (FileStatus partition : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition.getPath()));
+      for (FileStatus file : fs.listStatus(partition.getPath())) {
+        CompressionCodec codec = factory.getCodec(file.getPath());
+        assertTrue(codec instanceof DeflateCodec);
+      }
+    }
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableByTwoColumnsWithCompression";
+    ResultSet res = tpch.execute("create table " + tableName + " (col3 float8, col4 text) USING csv " +
+        "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+        "PARTITION by column(col1 int4, col2 int4)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus file : fs.listStatus(partition2.getPath())) {
+          CompressionCodec codec = factory.getCodec(file.getPath());
+          assertTrue(codec instanceof DeflateCodec);
+        }
+      }
+    }
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableNoMatchedPartition";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col4 text) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "partition by column(col1 int4, col2 int4, col3 float8)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    assertFalse(res.next());
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
+    String tableName = "testColumnPartitionedTableNoMatchedPartition";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col4 text) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "partition by column(col1 int4, col2 int4, col3 float8)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 9");
+    assertFalse(res.next());
+    res.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 6b3d90f..3c4d3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.util;
 
+import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -33,8 +34,7 @@ import org.apache.tajo.worker.dataserver.HttpUtil;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestTupleUtil {
   @Test
@@ -228,4 +228,52 @@ public class TestTupleUtil {
       assertEquals(range, result);
     }
   }
+
+  @Test
+  public void testBuildTupleFromPartitionPath() {
+
+    Schema schema = new Schema();
+    schema.addColumn("key1", Type.INT8);
+    schema.addColumn("key2", Type.TEXT);
+
+    Path path = new Path("hdfs://tajo/warehouse/partition_test/");
+    Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNull(tuple);
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases;
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNull(tuple);
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 5b6b5a3..4d78a27 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -74,7 +74,7 @@ public class TestExecutionBlockCursor {
 
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     AbstractStorageManager sm  = StorageManagerFactory.getStorageManager(conf);
     dispatcher = new AsyncDispatcher();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index ca1247a..0396f33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -82,7 +82,7 @@ public class TestGlobalPlanner {
 
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
     globalPlanner = new GlobalPlanner(util.getConfiguration(),
         StorageManagerFactory.getStorageManager(util.getConfiguration()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 6934872..43ea5f8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -79,7 +79,7 @@ public class TestRangeRetrieverHandler {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     schema = new Schema();
     schema.addColumn("empId", Type.INT4);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
index 65493a2..397e7ac 100644
--- a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
+++ b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
@@ -1,4 +1,4 @@
 CREATE TABLE sales ( col1 int, col2 int)
-PARTITION BY COLUMN (col1, col2, col3);
+PARTITION BY COLUMN (col3 int, col4 float, col5 text);