You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2016/04/20 22:39:35 UTC

[2/6] tajo git commit: TAJO-2108: Refactor Schema to be immutable.

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index ef3336d..0cc93da 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.storage.StorageConstants;
@@ -133,9 +133,10 @@ public class TestSortQuery extends QueryTestCaseBase {
       tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
       tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("col1", Type.INT4);
-      schema.addColumn("col2", Type.TEXT);
+      Schema schema = SchemaBuilder.builder()
+          .add("col1", Type.INT4)
+          .add("col2", Type.TEXT)
+          .build();
       String[] data = new String[]{
           "1|abc",
           "3|dfa",
@@ -239,9 +240,10 @@ public class TestSortQuery extends QueryTestCaseBase {
       tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
       tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("name", Type.TEXT);
+      Schema schema = SchemaBuilder.builder()
+          .add("id", Type.INT4)
+          .add("name", Type.TEXT)
+          .build();
       String[] data = new String[]{
           "1|BRAZIL",
           "2|ALGERIA",
@@ -265,9 +267,10 @@ public class TestSortQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("name", Type.TEXT)
+        .build();
     String[] data = new String[]{ "1|111", "2|\\N", "3|333" };
     TajoTestingCluster.createTable("testSortOnNullColumn2".toLowerCase(), schema, tableOptions, data, 1);
 
@@ -302,9 +305,10 @@ public class TestSortQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("name", Type.TEXT)
+        .build();
     String[] data = new String[]{ "1|111", "2|\\N", "3|333" };
     TajoTestingCluster.createTable("testSortOnNullColumn3".toLowerCase(), schema, tableOptions, data, 1);
 
@@ -330,9 +334,10 @@ public class TestSortQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("name", Type.TEXT)
+        .build();
     String[] data = new String[]{ "1|111", "2|\\N", "3|333" };
     TajoTestingCluster.createTable("testSortOnNullColumn4".toLowerCase(), schema, tableOptions, data, 1);
 
@@ -358,9 +363,10 @@ public class TestSortQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("name", Type.TEXT)
+        .build();
     String[] data = new String[]{ "1|111", "2|\\N", "3|333" };
     TajoTestingCluster.createTable("testSortOnNullColumn5".toLowerCase(), schema, tableOptions, data, 1);
 
@@ -388,9 +394,10 @@ public class TestSortQuery extends QueryTestCaseBase {
       tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
       tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("col1", Type.INT4);
-      schema.addColumn("col2", Type.TEXT);
+      Schema schema = SchemaBuilder.builder()
+          .add("col1", Type.INT4)
+          .add("col2", Type.TEXT)
+          .build();
       String[] data = new String[]{
           "1|하하하",
           "2|캬캬캬",
@@ -416,9 +423,10 @@ public class TestSortQuery extends QueryTestCaseBase {
       tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
       tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("col1", Type.INT4);
-      schema.addColumn("col2", Type.TEXT);
+      Schema schema = SchemaBuilder.builder()
+          .add("col1", Type.INT4)
+          .add("col2", Type.TEXT)
+          .build();
       String[] data = new String[]{
           "1|하하하",
           "2|캬캬캬",

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 3438759..196b488 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -23,16 +23,19 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.client.TajoClientUtil;
-import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.querymaster.QueryMasterTask;
@@ -1091,9 +1094,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
       tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
       tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("col1", TajoDataTypes.Type.TEXT);
-      schema.addColumn("col2", TajoDataTypes.Type.TEXT);
+      Schema schema = SchemaBuilder.builder()
+          .add("col1", TajoDataTypes.Type.TEXT)
+          .add("col2", TajoDataTypes.Type.TEXT)
+          .build();
 
       List<String> data = new ArrayList<>();
       int totalBytes = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
index 19b8bbc..54aa41e 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
@@ -23,7 +23,7 @@ import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
@@ -266,9 +266,10 @@ public class TestWindowQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", TajoDataTypes.Type.INT4)
+        .add("time", TajoDataTypes.Type.TIME)
+        .build();
     String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
     TajoTestingCluster.createTable("firstvaluetime", schema, tableOptions, data, 1);
 
@@ -301,9 +302,10 @@ public class TestWindowQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", TajoDataTypes.Type.INT4)
+        .add("time", TajoDataTypes.Type.TIME)
+        .build();
     String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
     TajoTestingCluster.createTable("lastvaluetime", schema, tableOptions, data, 1);
 
@@ -336,9 +338,10 @@ public class TestWindowQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", TajoDataTypes.Type.INT4)
+        .add("time", TajoDataTypes.Type.TIME)
+        .build();
     String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
     TajoTestingCluster.createTable("lagtime", schema, tableOptions, data, 1);
 
@@ -385,9 +388,10 @@ public class TestWindowQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", TajoDataTypes.Type.INT4)
+        .add("time", TajoDataTypes.Type.TIME)
+        .build();
     String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
     TajoTestingCluster.createTable("leadtime", schema, tableOptions, data, 1);
 
@@ -441,10 +445,11 @@ public class TestWindowQuery extends QueryTestCaseBase {
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("time", TajoDataTypes.Type.TIME);
-    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", TajoDataTypes.Type.INT4)
+        .add("time", TajoDataTypes.Type.TIME)
+        .add("name", TajoDataTypes.Type.TEXT)
+        .build();
     String[] data = new String[]{ "1|12:11:12|abc", "2|10:11:13|def", "2|05:42:41|ghi" };
     TajoTestingCluster.createTable("multiwindow", schema, tableOptions, data, 1);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
index 1cc526f..d681d11 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
@@ -96,7 +96,7 @@ public class BenchmarkSort {
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
 
-    Schema schema = SchemaFactory.newV1(new Column[] {
+    Schema schema = SchemaBuilder.builder().addAll(new Column[] {
         new Column("col0", Type.INT8),
         new Column("col1", Type.INT4),
         new Column("col2", Type.INT2),
@@ -112,7 +112,7 @@ public class BenchmarkSort {
         new Column("col12", Type.INT8),
         new Column("col13", Type.INT8),
         new Column("col14", Type.INT8),
-    });
+    }).build();
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
     Path employeePath = new Path(testDir, "employee.csv");

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 5683c7a..dff63c4 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -19,16 +19,17 @@
 package org.apache.tajo.engine.util;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
 import org.apache.tajo.engine.planner.UniformRangePartition;
 import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
@@ -39,8 +40,7 @@ import static org.junit.Assert.*;
 public class TestTupleUtil {
   @Test
   public final void testFixedSizeChar() {
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("col1", Type.CHAR, 5);
+    Schema schema = SchemaBuilder.builder().add("col1", CatalogUtil.newDataTypeWithLen(Type.CHAR, 5)).build();
 
     Tuple tuple = new VTuple(1);
     tuple.put(new Datum[] {
@@ -57,23 +57,22 @@ public class TestTupleUtil {
 
   @Test
   public final void testToBytesAndToTuple() {
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.BIT);
-    schema.addColumn("col3", Type.CHAR);
-    schema.addColumn("col4", Type.INT2);
-    schema.addColumn("col5", Type.INT4);
-    schema.addColumn("col6", Type.INT8);
-    schema.addColumn("col7", Type.FLOAT4);
-    schema.addColumn("col8", Type.FLOAT8);
-    schema.addColumn("col9", Type.TEXT);
-    schema.addColumn("col10", Type.BLOB);
-    schema.addColumn("col11", Type.INET4);
+    Schema schema = SchemaBuilder.builder()
+        .add("col1", Type.BOOLEAN)
+        .add("col2", Type.CHAR)
+        .add("col3", Type.INT2)
+        .add("col4", Type.INT4)
+        .add("col5", Type.INT8)
+        .add("col6", Type.FLOAT4)
+        .add("col7", Type.FLOAT8)
+        .add("col8", Type.TEXT)
+        .add("col9", Type.BLOB)
+        .add("col10", Type.INET4)
+        .build();
     //schema.addColumn("col11", DataType.IPv6);
     
     Tuple tuple = new VTuple(new Datum[] {
         DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
         DatumFactory.createChar('7'),
         DatumFactory.createInt2((short) 17),
         DatumFactory.createInt4(59),
@@ -95,41 +94,38 @@ public class TestTupleUtil {
 
   @Test
   public final void testGetPartitions() {
-    VTuple sTuple = new VTuple(7);
-    VTuple eTuple = new VTuple(7);
-
-    Schema schema = SchemaFactory.newV1();
-
-    schema.addColumn("numByte", Type.BIT);
-    schema.addColumn("numChar", Type.CHAR);
-    schema.addColumn("numShort", Type.INT2);
-    schema.addColumn("numInt", Type.INT4);
-    schema.addColumn("numLong", Type.INT8);
-    schema.addColumn("numFloat", Type.FLOAT4);
-    schema.addColumn("numDouble", Type.FLOAT4);
+    VTuple sTuple = new VTuple(6);
+    VTuple eTuple = new VTuple(6);
+
+    Schema schema = SchemaBuilder.builder()
+        .add("numChar", Type.CHAR)
+        .add("numShort", Type.INT2)
+        .add("numInt", Type.INT4)
+        .add("numLong", Type.INT8)
+        .add("numFloat", Type.FLOAT4)
+        .add("numDouble", Type.FLOAT4)
+        .build();
 
     SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
 
-    sTuple.put(0, DatumFactory.createBit((byte) 44));
-    sTuple.put(1, DatumFactory.createChar('a'));
-    sTuple.put(2, DatumFactory.createInt2((short) 10));
-    sTuple.put(3, DatumFactory.createInt4(5));
-    sTuple.put(4, DatumFactory.createInt8(100));
-    sTuple.put(5, DatumFactory.createFloat4(100));
-    sTuple.put(6, DatumFactory.createFloat8(100));
-
-    eTuple.put(0, DatumFactory.createBit((byte) 99));
-    eTuple.put(1, DatumFactory.createChar('p'));
-    eTuple.put(2, DatumFactory.createInt2((short) 70));
-    eTuple.put(3, DatumFactory.createInt4(70));
-    eTuple.put(4, DatumFactory.createInt8(10000));
-    eTuple.put(5, DatumFactory.createFloat4(150));
-    eTuple.put(6, DatumFactory.createFloat8(170));
+    sTuple.put(0, DatumFactory.createChar('a'));
+    sTuple.put(1, DatumFactory.createInt2((short) 10));
+    sTuple.put(2, DatumFactory.createInt4(5));
+    sTuple.put(3, DatumFactory.createInt8(100));
+    sTuple.put(4, DatumFactory.createFloat4(100));
+    sTuple.put(5, DatumFactory.createFloat8(100));
+
+    eTuple.put(0, DatumFactory.createChar('p'));
+    eTuple.put(1, DatumFactory.createInt2((short) 70));
+    eTuple.put(2, DatumFactory.createInt4(70));
+    eTuple.put(3, DatumFactory.createInt8(10000));
+    eTuple.put(4, DatumFactory.createFloat4(150));
+    eTuple.put(5, DatumFactory.createFloat8(170));
 
     RangePartitionAlgorithm partitioner = new UniformRangePartition(new TupleRange(sortSpecs, sTuple, eTuple),
         sortSpecs);
-    TupleRange [] ranges = partitioner.partition(5);
-    assertTrue(5 <= ranges.length);
+    TupleRange [] ranges = partitioner.partition(4);
+    assertTrue(4 <= ranges.length);
     BaseTupleComparator comp = new BaseTupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
     TupleRange prev = ranges[0];
     for (int i = 1; i < ranges.length; i++) {
@@ -142,9 +138,10 @@ public class TestTupleUtil {
   @Test
   public void testBuildTupleFromPartitionPath() {
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("key1", Type.INT8);
-    schema.addColumn("key2", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("key1", Type.INT8)
+        .add("key2", Type.TEXT)
+        .build();
 
     Path path = new Path("hdfs://tajo/warehouse/partition_test/");
     Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 25f0e61..cce0f4c 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -28,7 +28,7 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -63,10 +63,11 @@ public class TestRowFile {
 
   @Test
   public void test() throws IOException {
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("description", Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("age", Type.INT8)
+        .add("description", Type.TEXT)
+        .build();
 
     TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core-tests/src/test/resources/queries/TestCreateTable/create_table_various_types.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestCreateTable/create_table_various_types.sql b/tajo-core-tests/src/test/resources/queries/TestCreateTable/create_table_various_types.sql
index 891a139..bfa5e97 100644
--- a/tajo-core-tests/src/test/resources/queries/TestCreateTable/create_table_various_types.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestCreateTable/create_table_various_types.sql
@@ -1,10 +1,10 @@
 -- Some types were commented out due to Hive meta test.
 
 create table various_types (
-  col0 bit,
-  col1 BIT(10),
-  col2 bit varying,
-  col3 bit VARYING(10),
+  -- col0 bit,
+  -- col1 BIT(10),
+  -- col2 bit varying,
+  -- col3 bit VARYING(10),
   col4 tinyint,
   col5 smallInt,
   col6 integer,
@@ -38,13 +38,13 @@ create table various_types (
   col34 national character varying (255),
   col35 date,
   col36 time,
-  col37 timetz,
-  col38 time With time zone,
-  col39 timesTamptz,
-  col40 timestamp with time zone,
-  col41 binary,
-  col42 binary(10),
-  col43 varbinary(10),
-  col44 binary Varying(10),
+  -- col37 timetz,
+  -- col38 time With time zone,
+  -- col39 timesTamptz,
+  -- col40 timestamp with time zone,
+  -- col41 binary,
+  -- col42 binary(10),
+  -- col43 varbinary(10),
+  -- col44 binary Varying(10),
   col45 blOb
 );
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 9ccfeb7..3ff773b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -70,117 +70,125 @@ public class TPCH extends BenchmarkSet {
 
   @Override
   public void loadSchemas() {
-    Schema lineitem = SchemaFactory.newV1()
-        .addColumn("l_orderkey", Type.INT4) // 0
-        .addColumn("l_partkey", Type.INT4) // 1
-        .addColumn("l_suppkey", Type.INT4) // 2
-        .addColumn("l_linenumber", Type.INT4) // 3
-        .addColumn("l_quantity", Type.FLOAT8) // 4
-        .addColumn("l_extendedprice", Type.FLOAT8) // 5
-        .addColumn("l_discount", Type.FLOAT8) // 6
-        .addColumn("l_tax", Type.FLOAT8) // 7
+    schemas.put(LINEITEM, SchemaBuilder.builder()
+        .add("l_orderkey", Type.INT4) // 0
+        .add("l_partkey", Type.INT4) // 1
+        .add("l_suppkey", Type.INT4) // 2
+        .add("l_linenumber", Type.INT4) // 3
+        .add("l_quantity", Type.FLOAT8) // 4
+        .add("l_extendedprice", Type.FLOAT8) // 5
+        .add("l_discount", Type.FLOAT8) // 6
+        .add("l_tax", Type.FLOAT8) // 7
             // TODO - This is temporal solution. 8 and 9 are actually Char type.
-        .addColumn("l_returnflag", Type.TEXT) // 8
-        .addColumn("l_linestatus", Type.TEXT) // 9
+        .add("l_returnflag", Type.TEXT) // 8
+        .add("l_linestatus", Type.TEXT) // 9
             // TODO - This is temporal solution. 10,11, and 12 are actually Date type.
-        .addColumn("l_shipdate", Type.TEXT) // 10
-        .addColumn("l_commitdate", Type.TEXT) // 11
-        .addColumn("l_receiptdate", Type.TEXT) // 12
-        .addColumn("l_shipinstruct", Type.TEXT) // 13
-        .addColumn("l_shipmode", Type.TEXT) // 14
-        .addColumn("l_comment", Type.TEXT); // 15
-    schemas.put(LINEITEM, lineitem);
-
-    Schema customer = SchemaFactory.newV1()
-        .addColumn("c_custkey", Type.INT4) // 0
-        .addColumn("c_name", Type.TEXT) // 1
-        .addColumn("c_address", Type.TEXT) // 2
-        .addColumn("c_nationkey", Type.INT4) // 3
-        .addColumn("c_phone", Type.TEXT) // 4
-        .addColumn("c_acctbal", Type.FLOAT8) // 5
-        .addColumn("c_mktsegment", Type.TEXT) // 6
-        .addColumn("c_comment", Type.TEXT); // 7
-    schemas.put(CUSTOMER, customer);
-
-    Schema customerParts = SchemaFactory.newV1()
-        .addColumn("c_custkey", Type.INT4) // 0
-        .addColumn("c_name", Type.TEXT) // 1
-        .addColumn("c_address", Type.TEXT) // 2
-        .addColumn("c_phone", Type.TEXT) // 3
-        .addColumn("c_acctbal", Type.FLOAT8) // 4
-        .addColumn("c_mktsegment", Type.TEXT) // 5
-        .addColumn("c_comment", Type.TEXT); // 6
-    schemas.put(CUSTOMER_PARTS, customerParts);
-
-    Schema nation = SchemaFactory.newV1()
-        .addColumn("n_nationkey", Type.INT4) // 0
-        .addColumn("n_name", Type.TEXT) // 1
-        .addColumn("n_regionkey", Type.INT4) // 2
-        .addColumn("n_comment", Type.TEXT); // 3
-    schemas.put(NATION, nation);
-
-    Schema part = SchemaFactory.newV1()
-        .addColumn("p_partkey", Type.INT4) // 0
-        .addColumn("p_name", Type.TEXT) // 1
-        .addColumn("p_mfgr", Type.TEXT) // 2
-        .addColumn("p_brand", Type.TEXT) // 3
-        .addColumn("p_type", Type.TEXT) // 4
-        .addColumn("p_size", Type.INT4) // 5
-        .addColumn("p_container", Type.TEXT) // 6
-        .addColumn("p_retailprice", Type.FLOAT8) // 7
-        .addColumn("p_comment", Type.TEXT); // 8
-    schemas.put(PART, part);
-
-    Schema region = SchemaFactory.newV1()
-        .addColumn("r_regionkey", Type.INT4) // 0
-        .addColumn("r_name", Type.TEXT) // 1
-        .addColumn("r_comment", Type.TEXT); // 2
-    schemas.put(REGION, region);
-
-    Schema orders = SchemaFactory.newV1()
-        .addColumn("o_orderkey", Type.INT4) // 0
-        .addColumn("o_custkey", Type.INT4) // 1
-        .addColumn("o_orderstatus", Type.TEXT) // 2
-        .addColumn("o_totalprice", Type.FLOAT8) // 3
-            // TODO - This is temporal solution. o_orderdate is actually Date type.
-        .addColumn("o_orderdate", Type.TEXT) // 4
-        .addColumn("o_orderpriority", Type.TEXT) // 5
-        .addColumn("o_clerk", Type.TEXT) // 6
-        .addColumn("o_shippriority", Type.INT4) // 7
-        .addColumn("o_comment", Type.TEXT); // 8
+        .add("l_shipdate", Type.TEXT) // 10
+        .add("l_commitdate", Type.TEXT) // 11
+        .add("l_receiptdate", Type.TEXT) // 12
+        .add("l_shipinstruct", Type.TEXT) // 13
+        .add("l_shipmode", Type.TEXT) // 14
+        .add("l_comment", Type.TEXT) // 15
+        .build());
+
+    schemas.put(CUSTOMER, SchemaBuilder.builder()
+        .add("c_custkey", Type.INT4) // 0
+        .add("c_name", Type.TEXT) // 1
+        .add("c_address", Type.TEXT) // 2
+        .add("c_nationkey", Type.INT4) // 3
+        .add("c_phone", Type.TEXT) // 4
+        .add("c_acctbal", Type.FLOAT8) // 5
+        .add("c_mktsegment", Type.TEXT) // 6
+        .add("c_comment", Type.TEXT) // 7
+        .build());
+
+
+    schemas.put(CUSTOMER_PARTS, SchemaBuilder.builder()
+        .add("c_custkey", Type.INT4) // 0
+        .add("c_name", Type.TEXT) // 1
+        .add("c_address", Type.TEXT) // 2
+        .add("c_phone", Type.TEXT) // 3
+        .add("c_acctbal", Type.FLOAT8) // 4
+        .add("c_mktsegment", Type.TEXT) // 5
+        .add("c_comment", Type.TEXT) // 6
+        .build());
+
+
+    schemas.put(NATION, SchemaBuilder.builder()
+        .add("n_nationkey", Type.INT4) // 0
+        .add("n_name", Type.TEXT) // 1
+        .add("n_regionkey", Type.INT4) // 2
+        .add("n_comment", Type.TEXT) // 3
+        .build());
+
+
+    schemas.put(PART, SchemaBuilder.builder()
+        .add("p_partkey", Type.INT4) // 0
+        .add("p_name", Type.TEXT) // 1
+        .add("p_mfgr", Type.TEXT) // 2
+        .add("p_brand", Type.TEXT) // 3
+        .add("p_type", Type.TEXT) // 4
+        .add("p_size", Type.INT4) // 5
+        .add("p_container", Type.TEXT) // 6
+        .add("p_retailprice", Type.FLOAT8) // 7
+        .add("p_comment", Type.TEXT) // 8
+        .build());
+
+
+    schemas.put(REGION, SchemaBuilder.builder()
+        .add("r_regionkey", Type.INT4) // 0
+        .add("r_name", Type.TEXT) // 1
+        .add("r_comment", Type.TEXT) // 2
+        .build());
+
+
+    Schema orders = SchemaBuilder.builder()
+        .add("o_orderkey", Type.INT4) // 0
+        .add("o_custkey", Type.INT4) // 1
+        .add("o_orderstatus", Type.TEXT) // 2
+        .add("o_totalprice", Type.FLOAT8) // 3
+        // TODO - This is temporal solution. o_orderdate is actually Date type.
+        .add("o_orderdate", Type.TEXT) // 4
+        .add("o_orderpriority", Type.TEXT) // 5
+        .add("o_clerk", Type.TEXT) // 6
+        .add("o_shippriority", Type.INT4) // 7
+        .add("o_comment", Type.TEXT) // 8
+        .build();
     schemas.put(ORDERS, orders);
     schemas.put(EMPTY_ORDERS, orders);
 
 
-    Schema partsupp = SchemaFactory.newV1()
-        .addColumn("ps_partkey", Type.INT4) // 0
-        .addColumn("ps_suppkey", Type.INT4) // 1
-        .addColumn("ps_availqty", Type.INT4) // 2
-        .addColumn("ps_supplycost", Type.FLOAT8) // 3
-        .addColumn("ps_comment", Type.TEXT); // 4
-    schemas.put(PARTSUPP, partsupp);
-
-    Schema supplier = SchemaFactory.newV1()
-        .addColumn("s_suppkey", Type.INT4) // 0
-        .addColumn("s_name", Type.TEXT) // 1
-        .addColumn("s_address", Type.TEXT) // 2
-        .addColumn("s_nationkey", Type.INT4) // 3
-        .addColumn("s_phone", Type.TEXT) // 4
-        .addColumn("s_acctbal", Type.FLOAT8) // 5
-        .addColumn("s_comment", Type.TEXT); // 6
-    schemas.put(SUPPLIER, supplier);
+    schemas.put(PARTSUPP, SchemaBuilder.builder()
+        .add("ps_partkey", Type.INT4) // 0
+        .add("ps_suppkey", Type.INT4) // 1
+        .add("ps_availqty", Type.INT4) // 2
+        .add("ps_supplycost", Type.FLOAT8) // 3
+        .add("ps_comment", Type.TEXT) // 4
+        .build());
+
+
+    schemas.put(SUPPLIER, SchemaBuilder.builder()
+        .add("s_suppkey", Type.INT4) // 0
+        .add("s_name", Type.TEXT) // 1
+        .add("s_address", Type.TEXT) // 2
+        .add("s_nationkey", Type.INT4) // 3
+        .add("s_phone", Type.TEXT) // 4
+        .add("s_acctbal", Type.FLOAT8) // 5
+        .add("s_comment", Type.TEXT) // 6
+        .build());
   }
 
   public void loadOutSchema() {
-    Schema q2 = SchemaFactory.newV1()
-        .addColumn("s_acctbal", Type.FLOAT8)
-        .addColumn("s_name", Type.TEXT)
-        .addColumn("n_name", Type.TEXT)
-        .addColumn("p_partkey", Type.INT4)
-        .addColumn("p_mfgr", Type.TEXT)
-        .addColumn("s_address", Type.TEXT)
-        .addColumn("s_phone", Type.TEXT)
-        .addColumn("s_comment", Type.TEXT);
+    Schema q2 = SchemaBuilder.builder()
+        .add("s_acctbal", Type.FLOAT8)
+        .add("s_name", Type.TEXT)
+        .add("n_name", Type.TEXT)
+        .add("p_partkey", Type.INT4)
+        .add("p_mfgr", Type.TEXT)
+        .add("s_address", Type.TEXT)
+        .add("s_phone", Type.TEXT)
+        .add("s_comment", Type.TEXT)
+        .build();
     outSchemas.put("q2", q2);
   }
 
@@ -208,8 +216,8 @@ public class TPCH extends BenchmarkSet {
 
     PartitionMethodDesc partitionMethodDesc = null;
     if (tableName.equals(CUSTOMER_PARTS)) {
-      Schema expressionSchema = SchemaFactory.newV1();
-      expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4);
+      Schema expressionSchema = SchemaBuilder.builder()
+      .add("c_nationkey", TajoDataTypes.Type.INT4).build();
       partitionMethodDesc = new PartitionMethodDesc(
           tajo.getCurrentDatabase(),
           CUSTOMER_PARTS,

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index eb8f7ad..720c337 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.SchemaFactory;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -645,18 +646,17 @@ public class DistinctGroupbyBuilder {
     //Set SecondStage ColumnId and Input schema
     secondStageDistinctNode.setResultColumnIds(secondStageColumnIds);
 
-    Schema secondStageInSchema = SchemaFactory.newV1();
+    SchemaBuilder secondStageInSchema = SchemaBuilder.uniqueNameBuilder();
+
     //TODO merged tuple schema
     int index = 0;
     for(GroupbyNode eachNode: secondStageDistinctNode.getSubPlans()) {
       eachNode.setInSchema(firstStageDistinctNode.getOutSchema());
       for (Column column: eachNode.getOutSchema().getRootColumns()) {
-        if (secondStageInSchema.getColumn(column) == null) {
-          secondStageInSchema.addColumn(column);
-        }
+        secondStageInSchema.add(column);
       }
     }
-    secondStageDistinctNode.setInSchema(secondStageInSchema);
+    secondStageDistinctNode.setInSchema(secondStageInSchema.build());
 
     return new DistinctGroupbyNode[]{firstStageDistinctNode, secondStageDistinctNode};
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 8002989..3be1d36 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -20,10 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
-import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -43,6 +40,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 public class BSTIndexScanExec extends ScanExec {
@@ -94,18 +92,19 @@ public class BSTIndexScanExec extends ScanExec {
   }
 
   private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, List<Target> targets, EvalNode qual) {
-    Schema mergedSchema = SchemaFactory.newV1();
     Set<Column> qualAndTargets = new HashSet<>();
     qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
     for (Target target : targets) {
       qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
     }
+
+    SchemaBuilder mergedSchema = SchemaBuilder.builder();
     for (Column column : originalSchema.getRootColumns()) {
       if (subSchema.contains(column) || qualAndTargets.contains(column)) {
-        mergedSchema.addColumn(column);
+        mergedSchema.add(column);
       }
     }
-    return mergedSchema;
+    return mergedSchema.build();
   }
 
   @Override
@@ -127,33 +126,12 @@ public class BSTIndexScanExec extends ScanExec {
   public void init() throws IOException {
     reader.init();
 
-    Schema projected;
-
-    // in the case where projected column or expression are given
-    // the target can be an empty list.
-    if (plan.hasTargets()) {
-      projected = SchemaFactory.newV1();
-      Set<Column> columnSet = new HashSet<>();
-
-      if (plan.hasQual()) {
-        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
-      }
-
-      for (Target t : plan.getTargets()) {
-        columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
-      }
-
-      for (Column column : inSchema.getAllColumns()) {
-        if (columnSet.contains(column)) {
-          projected.addColumn(column);
-        }
-      }
-
-    } else {
-      // no any projected columns, meaning that all columns should be projected.
-      // TODO - this implicit rule makes code readability bad. So, we should remove it later
-      projected = outSchema;
-    }
+    final Schema projected = SeqScanExec.getProjectSchema(
+        plan.getInSchema(),
+        plan.getOutSchema(),
+        Optional.ofNullable(plan.getTargets()),
+        Optional.ofNullable(plan.getQual())
+    );
 
     initScanner(projected);
     super.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 1142095..83a9ff8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaBuilder;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -33,7 +36,10 @@ import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.InsertNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.StoreTableNode;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -84,7 +90,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
     if (plan.getType() == NodeType.INSERT && keyNum > 0) {
       Column[] removedPartitionColumns = new Column[this.outSchema.size() - keyNum];
       System.arraycopy(this.outSchema.toArray(), 0, removedPartitionColumns, 0, removedPartitionColumns.length);
-      this.outSchema = SchemaFactory.newV1(removedPartitionColumns);
+      this.outSchema = SchemaBuilder.builder().addAll(removedPartitionColumns).build();
     }
 
     keyIds = new int[keyNum];

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 04b23f8..dc48f3f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -36,14 +36,13 @@ import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 
 public class SeqScanExec extends ScanExec {
@@ -136,36 +135,37 @@ public class SeqScanExec extends ScanExec {
     }
   }
 
-  public Schema getProjectSchema() {
-    Schema projected;
+  public static Schema getProjectSchema(Schema inSchema, Schema outSchema,
+                                        Optional<Collection<Target>> targets,
+                                        Optional<EvalNode> qual) {
+    SchemaBuilder projected = SchemaBuilder.builder();
 
     // in the case where projected column or expression are given
     // the target can be an empty list.
-    if (plan.hasTargets()) {
-      projected = SchemaFactory.newV1();
+    if (targets.isPresent()) {
       Set<Column> columnSet = new HashSet<>();
 
-      if (plan.hasQual()) {
-        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+      if (qual.isPresent()) {
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual.get()));
       }
 
-      for (Target t : plan.getTargets()) {
+      for (Target t : targets.get()) {
         columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
       }
 
       for (Column column : inSchema.getAllColumns()) {
         if (columnSet.contains(column)) {
-          projected.addColumn(column);
+          projected.add(column);
         }
       }
 
+      return projected.build();
+
     } else {
       // no any projected columns, meaning that all columns should be projected.
       // TODO - this implicit rule makes code readability bad. So, we should remove it later
-      projected = outSchema;
+      return outSchema;
     }
-
-    return projected;
   }
 
   private void initScanIterator() {
@@ -187,7 +187,12 @@ public class SeqScanExec extends ScanExec {
       scanIt = new EmptyScanIterator();
 
     } else {
-      Schema projectedFields = getProjectSchema();
+      Schema projectedFields = getProjectSchema(
+          plan.getInSchema(),
+          plan.getOutSchema(),
+          Optional.ofNullable(plan.getTargets()),
+          Optional.ofNullable(plan.getQual())
+      );
       initScanner(projectedFields);
 
       // See Scanner.isProjectable() method. Depending on the result of isProjectable(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 04a4a19..4b4bfeb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.expr.EvalNode;
@@ -120,7 +120,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
       endCurrentRowFlags = new boolean[functions.length];
 
       List<Column> additionalSortKeyColumns = Lists.newArrayList();
-      Schema rewrittenSchema = SchemaFactory.newV1(outSchema);
+      Schema rewrittenSchema = SchemaBuilder.builder().addAll(outSchema.getRootColumns()).build();
       for (int i = 0; i < functions.length; i++) {
         WindowSpec.WindowEndBound endBound = functions[i].getWindowFrame().getEndBound();
         switch (endBound.getBoundType()) {
@@ -161,11 +161,13 @@ public class WindowAggExec extends UnaryPhysicalExec {
       }
 
       sortKeyColumns = new int[additionalSortKeyColumns.size()];
-      schemaForOrderBy = SchemaFactory.newV1(outSchema);
+      SchemaBuilder schemaForOrderByBld = SchemaBuilder.builder();
+      schemaForOrderByBld.addAll(outSchema.getRootColumns());
       for (int i = 0; i < additionalSortKeyColumns.size(); i++) {
         sortKeyColumns[i] = i;
-        schemaForOrderBy.addColumn(additionalSortKeyColumns.get(i));
+        schemaForOrderByBld.add(additionalSortKeyColumns.get(i));
       }
+      schemaForOrderBy = schemaForOrderByBld.build();
     } else {
       functions = new WindowFunctionEval[0];
       functionNum = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
index 47d4b4f..c16e95b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master.exec;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.SchemaFactory;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
@@ -155,11 +156,7 @@ public class ExplainPlanPreprocessorForTest {
       Column[] columns = schema.toArray();
       Arrays.sort(columns, columnComparator);
 
-      Schema sorted = SchemaFactory.newV1();
-      for (Column col : columns) {
-        sorted.addColumn(col);
-      }
-      return sorted;
+      return SchemaBuilder.builder().addAll(columns).build();
     }
 
     private EvalNode sortQual(EvalNode qual) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index da11bd8..1a51d98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -218,8 +218,8 @@ public class QueryExecutor {
       explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
     }
 
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("explain", TajoDataTypes.Type.TEXT);
+    Schema schema = SchemaBuilder.builder()
+        .add("explain", TajoDataTypes.Type.TEXT).build();
 
     SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
     MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema));

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8fdd6ce..2a688e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -634,7 +634,7 @@ public class Repartitioner {
     ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
     SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
     SortSpec [] sortSpecs = sortNode.getSortKeys();
-    Schema sortSchema = SchemaFactory.newV1(channel.getShuffleKeys());
+    Schema sortSchema = SchemaBuilder.builder().addAll(channel.getShuffleKeys()).build();
 
     TupleRange[] ranges;
     int determinedTaskNum;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
index f4bf2b0..e3cf73a 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
@@ -605,21 +605,22 @@ public class TajoDatabaseMetaData implements DatabaseMetaData {
         , new ArrayList<MetaDataTuple>());
   }
 
-  private final static Schema importedExportedSchema = SchemaFactory.newV1()
-      .addColumn("PKTABLE_CAT", Type.VARCHAR)   // 0
-      .addColumn("PKTABLE_SCHEM", Type.VARCHAR) // 1
-      .addColumn("PKTABLE_NAME", Type.VARCHAR)  // 2
-      .addColumn("PKCOLUMN_NAME", Type.VARCHAR) // 3
-      .addColumn("FKTABLE_CAT", Type.VARCHAR)   // 4
-      .addColumn("FKTABLE_SCHEM", Type.VARCHAR) // 5
-      .addColumn("FKTABLE_NAME", Type.VARCHAR)  // 6
-      .addColumn("FKCOLUMN_NAME", Type.VARCHAR) // 7
-      .addColumn("KEY_SEQ", Type.INT2)          // 8
-      .addColumn("UPDATE_RULE", Type.INT2)      // 9
-      .addColumn("DELETE_RULE", Type.INT2)      // 10
-      .addColumn("FK_NAME", Type.VARCHAR)       // 11
-      .addColumn("PK_NAME", Type.VARCHAR)       // 12
-      .addColumn("DEFERRABILITY", Type.INT2);   // 13
+  private final static Schema importedExportedSchema = SchemaBuilder.builder()
+      .add("PKTABLE_CAT", Type.VARCHAR)   // 0
+      .add("PKTABLE_SCHEM", Type.VARCHAR) // 1
+      .add("PKTABLE_NAME", Type.VARCHAR)  // 2
+      .add("PKCOLUMN_NAME", Type.VARCHAR) // 3
+      .add("FKTABLE_CAT", Type.VARCHAR)   // 4
+      .add("FKTABLE_SCHEM", Type.VARCHAR) // 5
+      .add("FKTABLE_NAME", Type.VARCHAR)  // 6
+      .add("FKCOLUMN_NAME", Type.VARCHAR) // 7
+      .add("KEY_SEQ", Type.INT2)          // 8
+      .add("UPDATE_RULE", Type.INT2)      // 9
+      .add("DELETE_RULE", Type.INT2)      // 10
+      .add("FK_NAME", Type.VARCHAR)       // 11
+      .add("PK_NAME", Type.VARCHAR)       // 12
+      .add("DEFERRABILITY", Type.INT2)    // 13
+      .build();
 
   @Override
   public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 0fbb9aa..0e9b2a7 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -25,12 +25,12 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
@@ -69,9 +69,10 @@ public class TestResultSet {
     conf = util.getConfiguration();
     sm = TablespaceManager.getDefault();
 
-    scoreSchema = SchemaFactory.newV1();
-    scoreSchema.addColumn("deptname", Type.TEXT);
-    scoreSchema.addColumn("score", Type.INT4);
+    scoreSchema = SchemaBuilder.builder()
+        .add("deptname", Type.TEXT)
+        .add("score", Type.INT4)
+        .build();
     scoreMeta = CatalogUtil.newTableMeta("TEXT");
     rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scoreSchema));
     TableStats stats = new TableStats();
@@ -197,10 +198,11 @@ public class TestResultSet {
       String query = "select col1, col2, col3 from " + tableName;
 
       String [] table = new String[] {tableName};
-      Schema schema = SchemaFactory.newV1();
-      schema.addColumn("col1", Type.DATE);
-      schema.addColumn("col2", Type.TIME);
-      schema.addColumn("col3", Type.TIMESTAMP);
+      Schema schema = SchemaBuilder.builder()
+          .add("col1", Type.DATE)
+          .add("col2", Type.TIME)
+          .add("col3", Type.TIMESTAMP)
+          .build();
       Schema [] schemas = new Schema[] {schema};
       String [] data = {
           "2014-01-01|01:00:00|2014-01-01 01:00:00"

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
index 56755a5..07207dd 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
@@ -169,7 +169,7 @@ public class TestTajoJdbcNegative extends QueryTestCaseBase {
 
     try (Statement stmt = conn.createStatement()) {
       stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS TestTajoJdbcNegative");
-      stmt.executeUpdate("CREATE TABLE TestTajoJdbcNegative.table123u8sd ( name RECORD(last TEXT, first TEXT) )");
+      stmt.executeUpdate("CREATE TABLE TestTajoJdbcNegative.table123u8sd ( name RECORD (last TEXT, first TEXT) )");
 
       try (ResultSet resultSet = stmt.executeQuery("select name FROM TestTajoJdbcNegative.table123u8sd")) {
         fail("Getting a record type field must be failed");

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index f791a3d..9176c2f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -32,13 +33,13 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.WindowSpec;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.LogicalPlan.QueryBlock;
 import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
 import org.apache.tajo.plan.expr.*;
@@ -52,6 +53,7 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.StringUtils;
 
+import javax.annotation.Nullable;
 import java.net.URI;
 import java.util.*;
 
@@ -1228,21 +1230,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   private static Schema getNaturalJoinSchema(LogicalNode left, LogicalNode right) {
-    Schema joinSchema = SchemaFactory.newV1();
-    Schema commons = SchemaUtil.getNaturalJoinColumns(left.getOutSchema(), right.getOutSchema());
-    joinSchema.addColumns(commons);
-    for (Column c : left.getOutSchema().getRootColumns()) {
-      if (!joinSchema.contains(c.getQualifiedName())) {
-        joinSchema.addColumn(c);
-      }
-    }
-
-    for (Column c : right.getOutSchema().getRootColumns()) {
-      if (!joinSchema.contains(c.getQualifiedName())) {
-        joinSchema.addColumn(c);
-      }
-    }
-    return joinSchema;
+    SchemaBuilder joinSchema = SchemaBuilder.uniqueNameBuilder();
+    joinSchema.addAll(left.getOutSchema().getRootColumns());
+    joinSchema.addAll(right.getOutSchema().getRootColumns());
+    return joinSchema.build();
   }
 
   private static EvalNode getNaturalJoinCondition(JoinNode joinNode) {
@@ -1677,7 +1668,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       // See PreLogicalPlanVerifier.visitInsert.
       // It guarantees that the equivalence between the numbers of target and projected columns.
       ColumnReferenceExpr [] targets = expr.getTargetColumns();
-      Schema targetColumns = SchemaFactory.newV1();
+      final SchemaBuilder targetColumnsBld = SchemaBuilder.builder();
       for (ColumnReferenceExpr target : targets) {
         Column targetColumn = desc.getLogicalSchema().getColumn(target.getCanonicalName().replace(".", "/"));
 
@@ -1685,8 +1676,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
           throw makeSyntaxError("column '" + target + "' of relation '" + desc.getName() + "' does not exist");
         }
 
-        targetColumns.addColumn(targetColumn);
+        targetColumnsBld.add(targetColumn);
       }
+      final Schema targetColumns = targetColumnsBld.build();
       insertNode.setTargetSchema(targetColumns);
       insertNode.setOutSchema(targetColumns);
       buildProjectedInsert(context, insertNode);
@@ -1697,11 +1689,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       Schema tableSchema = desc.getLogicalSchema();
       Schema projectedSchema = insertNode.getChild().getOutSchema();
 
-      Schema targetColumns = SchemaFactory.newV1();
+      SchemaBuilder targetColumns = SchemaBuilder.builder();
       for (int i = 0; i < projectedSchema.size(); i++) {
-        targetColumns.addColumn(tableSchema.getColumn(i));
+        targetColumns.add(tableSchema.getColumn(i));
       }
-      insertNode.setTargetSchema(targetColumns);
+      insertNode.setTargetSchema(targetColumns.build());
       buildProjectedInsert(context, insertNode);
     }
 
@@ -1956,15 +1948,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
               queryOutputSchema.size() < partitionExpressionSchema.size()) {
             throw makeSyntaxError("Partition columns cannot be more than table columns.");
           }
-          Schema tableSchema = SchemaFactory.newV1();
+          SchemaBuilder tableSchemaBld = SchemaBuilder.builder();
           for (int i = 0; i < queryOutputSchema.size() - partitionExpressionSchema.size(); i++) {
-            tableSchema.addColumn(queryOutputSchema.getColumn(i));
+            tableSchemaBld.add(queryOutputSchema.getColumn(i));
           }
+          Schema tableSchema = tableSchemaBld.build();
           createTableNode.setOutSchema(tableSchema);
           createTableNode.setTableSchema(tableSchema);
         } else {
           // Convert the schema of subquery into the target table's one.
-          Schema schema = SchemaFactory.newV1(subQuery.getOutSchema());
+          Schema schema = SchemaBuilder.builder().addAll(subQuery.getOutSchema().getRootColumns()).build();
           schema.setQualifier(createTableNode.getTableName());
           createTableNode.setOutSchema(schema);
           createTableNode.setTableSchema(schema);
@@ -2040,13 +2033,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @return schema transformed from table definition elements
    */
   private Schema convertColumnsToSchema(ColumnDefinition[] elements) {
-    Schema schema = SchemaFactory.newV1();
-
-    for (ColumnDefinition columnDefinition: elements) {
-      schema.addColumn(convertColumn(columnDefinition));
-    }
-
-    return schema;
+    return SchemaBuilder.builder().addAll(elements, new Function<ColumnDefinition, Column>() {
+      @Override
+      public Column apply(@Nullable ColumnDefinition input) {
+        return convertColumn(input);
+      }
+    }).build();
   }
 
   /**
@@ -2056,13 +2048,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @return schema transformed from table definition elements
    */
   private static Schema convertTableElementsSchema(ColumnDefinition[] elements) {
-    Schema schema = SchemaFactory.newV1();
-
-    for (ColumnDefinition columnDefinition: elements) {
-      schema.addColumn(convertColumn(columnDefinition));
-    }
-
-    return schema;
+    return SchemaBuilder.builder().addAll(elements, new Function<ColumnDefinition, Column>() {
+      @Override
+      public Column apply(@Nullable ColumnDefinition input) {
+        return convertColumn(input);
+      }
+    }).build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
index 31cd1c6..c71dba2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
@@ -18,16 +18,14 @@
 
 package org.apache.tajo.plan.expr;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.ColumnReferenceExpr;
 import org.apache.tajo.algebra.NamedExpr;
 import org.apache.tajo.algebra.OpType;
 import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.exception.TajoInternalError;
@@ -143,16 +141,15 @@ public class EvalTreeUtil {
     node.postOrder(finder);
     return finder.getColumnRefs();
   }
-  
+
   public static Schema getSchemaByTargets(Schema inputSchema, List<Target> targets) {
-    Schema schema = SchemaFactory.newV1();
-    for (Target target : targets) {
-      schema.addColumn(
-          target.hasAlias() ? target.getAlias() : target.getEvalTree().getName(),
-          getDomainByExpr(inputSchema, target.getEvalTree()));
-    }
-    
-    return schema;
+    return SchemaBuilder.builder().addAll(targets, new Function<Target, Column>() {
+      @Override
+      public Column apply(@javax.annotation.Nullable Target target) {
+        return new Column(target.hasAlias() ? target.getAlias() : target.getEvalTree().getName(),
+            getDomainByExpr(inputSchema, target.getEvalTree()));
+      }
+    }).build();
   }
 
   public static String columnsToStr(Collection<Column> columns) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 8bccdc4..2e2e7b5 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -290,7 +290,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
   private final TableMeta pipeMeta = CatalogUtil.newTableMeta("TEXT");
 
   private final Tuple EMPTY_INPUT = new VTuple(0);
-  private final Schema EMPTY_SCHEMA = SchemaFactory.newV1();
+  private final Schema EMPTY_SCHEMA = SchemaBuilder.builder().build();
 
   public PythonScriptEngine(FunctionDesc functionDesc) {
     if (!functionDesc.getInvocation().hasPython() && !functionDesc.getInvocation().hasPythonAggregation()) {
@@ -388,27 +388,31 @@ public class PythonScriptEngine extends TajoScriptEngine {
   private void setSchema() {
     if (invocationDesc.isScalarFunction()) {
       TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes();
-      inSchema = SchemaFactory.newV1();
+      SchemaBuilder inSchemaBuilder = SchemaBuilder.builder();
       for (int i = 0; i < paramTypes.length; i++) {
-        inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+        inSchemaBuilder.add(new Column("in_" + i, paramTypes[i]));
       }
-      outSchema = SchemaFactory.newV1(new Column[]{new Column("out", functionSignature.getReturnType())});
+      inSchema = inSchemaBuilder.build();
+      outSchema = SchemaBuilder.builder()
+          .addAll(new Column[]{new Column("out", functionSignature.getReturnType())})
+          .build();
     } else {
       // UDAF
       if (firstPhase) {
         // first phase
         TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes();
-        inSchema = SchemaFactory.newV1();
+        SchemaBuilder inSchemaBuilder = SchemaBuilder.builder();
         for (int i = 0; i < paramTypes.length; i++) {
-          inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+          inSchemaBuilder.add(new Column("in_" + i, paramTypes[i]));
         }
-        outSchema = SchemaFactory.newV1(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)});
+        inSchema = inSchemaBuilder.build();
+        outSchema = SchemaBuilder.builder().add(new Column("json", TajoDataTypes.Type.TEXT)).build();
       } else if (lastPhase) {
-        inSchema = SchemaFactory.newV1(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)});
-        outSchema = SchemaFactory.newV1(new Column[]{new Column("out", functionSignature.getReturnType())});
+        inSchema = SchemaBuilder.builder().add(new Column("json", TajoDataTypes.Type.TEXT)).build();
+        outSchema = SchemaBuilder.builder().add(new Column("out", functionSignature.getReturnType())).build();
       } else {
         // intermediate phase
-        inSchema = outSchema = SchemaFactory.newV1(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)});
+        inSchema = outSchema = SchemaBuilder.builder().add(new Column("json", TajoDataTypes.Type.TEXT)).build();
       }
     }
     projectionCols = new int[outSchema.size()];

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
index 3fed563..bb24b22 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
@@ -22,6 +22,7 @@ import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.TUtil;
 
@@ -42,8 +43,8 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
 
   public Schema getLogicalSchema() {
     if (hasPartition()) {
-      Schema logicalSchema = SchemaFactory.newV1(tableSchema);
-      logicalSchema.addColumns(getPartitionMethod().getExpressionSchema());
+      Schema logicalSchema = SchemaUtil.merge(tableSchema, getPartitionMethod().getExpressionSchema());
+      logicalSchema.setQualifier(tableName);
       return logicalSchema;
     } else {
       return tableSchema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
index a993d64..de99800 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.plan.logical;
 
 import com.google.gson.annotations.Expose;
-
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.TUtil;
@@ -124,9 +122,9 @@ public class InsertNode extends StoreTableNode implements Cloneable {
   public Object clone() throws CloneNotSupportedException {
     InsertNode insertNode = (InsertNode) super.clone();
     insertNode.overwrite = overwrite;
-    insertNode.tableSchema = SchemaFactory.newV1(tableSchema);
-    insertNode.targetSchema = targetSchema != null ? SchemaFactory.newV1(targetSchema) : null;
-    insertNode.projectedSchema = projectedSchema != null ? SchemaFactory.newV1(projectedSchema) : null;
+    insertNode.tableSchema = (Schema) tableSchema.clone();
+    insertNode.targetSchema = targetSchema != null ? (Schema) targetSchema.clone() : null;
+    insertNode.projectedSchema = projectedSchema != null ? (Schema) projectedSchema.clone() : null;
     insertNode.uri = uri != null ? uri : null;
     return insertNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
index 5427ba6..0729d6e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
@@ -81,7 +81,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
     String qualifiedAlias = CatalogUtil.buildFQName(databaseName, alias);
     this.setInSchema(tableDesc.getSchema());
     this.getInSchema().setQualifier(qualifiedAlias);
-    this.setOutSchema(SchemaFactory.newV1(getInSchema()));
+    this.setOutSchema(SchemaBuilder.builder().addAll(getInSchema().getRootColumns()).build());
     logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, qualifiedAlias);
 	}
 	

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
index c5ca1ef..60187d9 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.plan.rewrite;
 
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
@@ -43,6 +44,7 @@ import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.graph.DirectedGraphVisitor;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
 
+import javax.annotation.Nullable;
 import java.util.*;
 
 /**
@@ -401,7 +403,6 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
     private Schema buildSchemaFromColumnSet(Set<Column> columns) throws TajoException {
       SchemaGraph schemaGraph = new SchemaGraph();
       Set<ColumnVertex> rootVertexes = new HashSet<>();
-      Schema schema = SchemaFactory.newV1();
 
       Set<Column> simpleColumns = new HashSet<>();
       List<Column> columnList = new ArrayList<>(columns);
@@ -446,21 +447,20 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
         }
       }
 
+      SchemaBuilder schema = SchemaBuilder.uniqueNameBuilder();
       // Build record columns
       RecordColumnBuilder builder = new RecordColumnBuilder(schemaGraph);
       for (ColumnVertex eachRoot : rootVertexes) {
         schemaGraph.accept(null, eachRoot, builder);
-        schema.addColumn(eachRoot.column);
+        schema.add(eachRoot.column);
       }
 
       // Add simple columns
       for (Column eachColumn : simpleColumns) {
-        if (!schema.contains(eachColumn)) {
-          schema.addColumn(eachColumn);
-        }
+        schema.add(eachColumn);
       }
 
-      return schema;
+      return schema.build();
     }
 
     private static class ColumnVertex {
@@ -523,11 +523,15 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
         if (graph.isLeaf(schemaVertex)) {
           schemaVertex.column = new Column(schemaVertex.name, schemaVertex.type);
         } else {
-          Schema schema = SchemaFactory.newV1();
-          for (ColumnVertex eachChild : graph.getChilds(schemaVertex)) {
-            schema.addColumn(eachChild.column);
-          }
-          schemaVertex.column = new Column(schemaVertex.name, new TypeDesc(schema));
+          SchemaBuilder schema = SchemaBuilder.builder()
+              .addAll(graph.getChilds(schemaVertex), new Function<ColumnVertex, Column>() {
+                @Override
+                public Column apply(@Nullable ColumnVertex input) {
+                  return input.column;
+                }
+              });
+
+          schemaVertex.column = new Column(schemaVertex.name, new TypeDesc(schema.build()));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
index 26c27b3..2c197c2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
@@ -18,11 +18,9 @@
 
 package org.apache.tajo.plan.rewrite.rules;
 
+import com.google.common.base.Function;
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.IndexDesc;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
-import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.datum.Datum;
@@ -30,6 +28,7 @@ import org.apache.tajo.plan.serder.EvalNodeDeserializer;
 import org.apache.tajo.plan.serder.EvalNodeSerializer;
 import org.apache.tajo.plan.serder.PlanProto.SimplePredicateProto;
 
+import javax.annotation.Nullable;
 import java.net.URI;
 
 public class IndexScanInfo extends AccessPathInfo {
@@ -93,11 +92,13 @@ public class IndexScanInfo extends AccessPathInfo {
   public IndexScanInfo(TableStats tableStats, IndexDesc indexDesc, SimplePredicate[] predicates) {
     super(ScanTypeControl.INDEX_SCAN, tableStats);
     this.indexPath = indexDesc.getIndexPath();
-    keySchema = SchemaFactory.newV1();
     this.predicates = predicates;
-    for (SimplePredicate predicate : predicates) {
-      keySchema.addColumn(predicate.getKeySortSpec().getSortKey());
-    }
+    keySchema = SchemaBuilder.builder().addAll(predicates, new Function<SimplePredicate, Column>() {
+      @Override
+      public Column apply(@Nullable SimplePredicate p) {
+        return p.getKeySortSpec().getSortKey();
+      }
+    }).build();
   }
 
   public URI getIndexPath() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index 00580ae..cf92ea0 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -334,10 +334,9 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
     TableDesc table = scanNode.getTableDesc();
     PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
 
-    Schema paritionValuesSchema = SchemaFactory.newV1();
-    for (Column column : partitionDesc.getExpressionSchema().getRootColumns()) {
-      paritionValuesSchema.addColumn(column);
-    }
+    Schema paritionValuesSchema = SchemaBuilder.builder()
+        .addAll(partitionDesc.getExpressionSchema().getRootColumns())
+        .build();
 
     Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index eccd37a..ff3e3c8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -18,16 +18,16 @@
 
 package org.apache.tajo.plan.util;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.exception.UndefinedTableException;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
@@ -565,12 +565,12 @@ public class PlannerUtil {
   }
 
   public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
-    Schema schema = SchemaFactory.newV1();
-    for (SortSpec spec : sortSpecs) {
-      schema.addColumn(spec.getSortKey());
-    }
-
-    return schema;
+    return SchemaBuilder.builder().addAll(sortSpecs, new Function<SortSpec, Column>() {
+      @Override
+      public Column apply(@javax.annotation.Nullable SortSpec s) {
+        return s.getSortKey();
+      }
+    }).build();
   }
 
   public static SortSpec[][] getSortKeysFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
@@ -656,21 +656,26 @@ public class PlannerUtil {
   }
 
   public static Schema targetToSchema(List<Target> targets) {
-    Schema schema = SchemaFactory.newV1();
+    SchemaBuilder schema = SchemaBuilder.uniqueNameBuilder();
     for (Target t : targets) {
       DataType type = t.getEvalTree().getValueType();
+
+      // hack to avoid projecting record type.
+      if (type.getType() == TajoDataTypes.Type.RECORD) {
+        throw new TajoRuntimeException(new NotImplementedException("record projection"));
+      }
+
       String name;
       if (t.hasAlias()) {
         name = t.getAlias();
       } else {
         name = t.getEvalTree().getName();
       }
-      if (!schema.containsByQualifiedName(name)) {
-        schema.addColumn(name, type);
-      }
+
+      schema.add(name, type);
     }
 
-    return schema;
+    return schema.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-plan/src/test/java/org/apache/tajo/plan/TestLogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/test/java/org/apache/tajo/plan/TestLogicalNode.java b/tajo-plan/src/test/java/org/apache/tajo/plan/TestLogicalNode.java
index f3acb00..3e44e3b 100644
--- a/tajo-plan/src/test/java/org/apache/tajo/plan/TestLogicalNode.java
+++ b/tajo-plan/src/test/java/org/apache/tajo/plan/TestLogicalNode.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.plan.logical.JoinNode;
@@ -36,10 +36,11 @@ public class TestLogicalNode {
 
   @Test
   public void testEquals() {
-    Schema schema = SchemaFactory.newV1();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-    schema.addColumn("age", Type.INT2);
+    Schema schema = SchemaBuilder.builder()
+        .add("id", Type.INT4)
+        .add("name", Type.TEXT)
+        .add("age", Type.INT2)
+        .build();
     GroupbyNode groupbyNode = new GroupbyNode(0);
     groupbyNode.setGroupingColumns(new Column[]{schema.getColumn(1), schema.getColumn(2)});
     ScanNode scanNode = new ScanNode(0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index ec20aca..1fa6214 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SchemaBuilder;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -166,7 +166,7 @@ public class MergeScanner implements Scanner {
 
   @Override
   public void setTarget(Column[] targets) {
-    this.target = SchemaFactory.newV1(targets);
+    this.target = SchemaBuilder.builder().addAll(targets).build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4aef83a3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
index 560e642..be30fe0 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -18,10 +18,11 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaFactory;
-import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.*;
+
+import javax.annotation.Nullable;
 
 /**
  * It represents a pair of start and end tuples.
@@ -39,12 +40,12 @@ public class TupleRange implements Comparable<TupleRange>, Cloneable {
   }
 
   public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
-    Schema schema = SchemaFactory.newV1();
-    for (SortSpec spec : sortSpecs) {
-      schema.addColumn(spec.getSortKey());
-    }
-
-    return schema;
+    return SchemaBuilder.builder().addAll(sortSpecs, new Function<SortSpec, Column>() {
+      @Override
+      public Column apply(@Nullable SortSpec input) {
+        return input.getSortKey();
+      }
+    }).build();
   }
 
   public void setStart(Tuple tuple) {