You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2016/03/28 06:07:28 UTC

[5/5] tajo git commit: TAJO-2099: Implement an Adapter for legacy Schema.

TAJO-2099: Implement an Adapter for legacy Schema.

Closes #981


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8dad551e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8dad551e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8dad551e

Branch: refs/heads/master
Commit: 8dad551ec54ee33f40bc355376ebcb4b370a5ada
Parents: 6826358
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Mar 28 13:06:51 2016 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Mar 28 13:06:51 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   2 +-
 .../java/org/apache/tajo/catalog/IndexDesc.java |   2 +-
 .../java/org/apache/tajo/catalog/Schema.java    | 479 ++--------------
 .../org/apache/tajo/catalog/SchemaFactory.java  |  48 ++
 .../org/apache/tajo/catalog/SchemaLegacy.java   | 555 +++++++++++++++++++
 .../org/apache/tajo/catalog/SchemaUtil.java     |   8 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |   4 +-
 .../tajo/catalog/json/CatalogGsonHelper.java    |   8 +-
 .../apache/tajo/catalog/json/SchemaAdapter.java |  57 ++
 .../catalog/partition/PartitionMethodDesc.java  |   3 +-
 .../org/apache/tajo/catalog/TestIndexDesc.java  |   2 +-
 .../org/apache/tajo/catalog/TestSchema.java     |  58 +-
 .../org/apache/tajo/catalog/TestTableDesc.java  |   4 +-
 .../org/apache/tajo/catalog/TestTableMeta.java  |   8 +-
 .../tajo/catalog/store/HiveCatalogStore.java    |   6 +-
 .../catalog/store/TestHiveCatalogStore.java     |  24 +-
 .../tajo/catalog/store/AbstractDBStore.java     |   6 +-
 .../apache/tajo/catalog/CatalogTestingUtil.java |   4 +-
 .../org/apache/tajo/catalog/TestCatalog.java    |  42 +-
 .../TestCatalogAgainstCaseSensitivity.java      |   6 +-
 .../tajo/catalog/TestCatalogExceptions.java     |   2 +-
 .../tajo/catalog/TestLinkedMetadataManager.java |  10 +-
 .../org/apache/tajo/client/QueryClientImpl.java |   3 +-
 .../org/apache/tajo/client/TajoClientUtil.java  |   7 +-
 .../apache/tajo/jdbc/TajoMetaDataResultSet.java |   3 +-
 .../org/apache/tajo/BackendTestingUtil.java     |   3 +-
 .../org/apache/tajo/json/CommonGsonHelper.java  |   4 +-
 .../java/org/apache/tajo/json/GsonHelper.java   |   6 +-
 .../apache/tajo/cli/tools/TestDDLBuilder.java   |   8 +-
 .../TestCatalogAdminClientExceptions.java       |   7 +-
 .../engine/codegen/TestEvalCodeGenerator.java   |  15 +-
 .../apache/tajo/engine/eval/TestEvalTree.java   |   5 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |   2 +-
 .../apache/tajo/engine/eval/TestPredicates.java |  31 +-
 .../tajo/engine/eval/TestSQLExpression.java     |  23 +-
 .../engine/function/TestBuiltinFunctions.java   |  51 +-
 .../function/TestConditionalExpressions.java    |   9 +-
 .../engine/function/TestDateTimeFunctions.java  |  13 +-
 .../tajo/engine/function/TestMathFunctions.java |  57 +-
 .../function/TestPatternMatchingPredicates.java |   3 +-
 .../TestStringOperatorsAndFunctions.java        |  53 +-
 .../function/TestUserDefinedFunctions.java      |   3 +-
 .../engine/planner/TestJoinOrderAlgorithm.java  |   8 +-
 .../engine/planner/TestLogicalOptimizer.java    |   6 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  14 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |  18 +-
 .../planner/TestUniformRangePartition.java      |  73 +--
 .../planner/physical/TestExternalSortExec.java  |   2 +-
 .../physical/TestFullOuterHashJoinExec.java     |   8 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  10 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   4 +-
 .../planner/physical/TestHashJoinExec.java      |   4 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   4 +-
 .../physical/TestLeftOuterHashJoinExec.java     |   8 +-
 .../planner/physical/TestMergeJoinExec.java     |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |   4 +-
 .../physical/TestProgressExternalSortExec.java  |   2 +-
 .../physical/TestRightOuterHashJoinExec.java    |   6 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  10 +-
 .../engine/planner/physical/TestSortExec.java   |   4 +-
 .../planner/physical/TestSortIntersectExec.java |   4 +-
 .../planner/physical/TestTupleSorter.java       |   3 +-
 .../planner/physical/TestUnSafeTuple.java       |   3 +-
 .../tajo/engine/query/TestGroupByQuery.java     |   9 +-
 .../tajo/engine/query/TestHBaseTable.java       |  19 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |  17 +-
 .../tajo/engine/query/TestNullValues.java       |  11 +-
 .../tajo/engine/query/TestSelectQuery.java      |   3 +-
 .../apache/tajo/engine/query/TestSortQuery.java |  17 +-
 .../tajo/engine/query/TestTablePartitions.java  |   7 +-
 .../tajo/engine/query/TestWindowQuery.java      |  11 +-
 .../apache/tajo/engine/util/TestTupleUtil.java  |   9 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   3 +-
 .../tajo/ws/rs/resources/RestTestUtils.java     |  64 ---
 .../ws/rs/resources/TestClusterResource.java    |   3 +-
 .../ws/rs/resources/TestDatabasesResource.java  |   3 +-
 .../ws/rs/resources/TestFunctionsResource.java  |   3 +-
 .../tajo/ws/rs/resources/TestQueryResource.java |   3 +-
 .../rs/resources/TestQueryResultResource.java   |   3 +-
 .../ws/rs/resources/TestSessionsResource.java   |   3 +-
 .../ws/rs/resources/TestTablesResource.java     |   3 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  23 +-
 .../apache/tajo/engine/json/CoreGsonHelper.java |  24 +-
 .../tajo/engine/planner/global/DataChannel.java |   3 +-
 .../global/builder/DistinctGroupbyBuilder.java  |   3 +-
 .../planner/physical/BSTIndexScanExec.java      |   5 +-
 .../planner/physical/ColPartitionStoreExec.java |   7 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../engine/planner/physical/WindowAggExec.java  |   5 +-
 .../tajo/master/TajoMasterClientService.java    |   2 +-
 .../exec/ExplainPlanPreprocessorForTest.java    |   3 +-
 .../apache/tajo/master/exec/QueryExecutor.java  |   2 +-
 .../apache/tajo/querymaster/Repartitioner.java  |   2 +-
 .../org/apache/tajo/ws/rs/TajoRestService.java  |  43 +-
 .../apache/tajo/jdbc/TajoDatabaseMetaData.java  |   2 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |   4 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |  14 +-
 .../org/apache/tajo/plan/expr/EvalTreeUtil.java |   3 +-
 .../function/python/PythonScriptEngine.java     |  16 +-
 .../tajo/plan/logical/CreateTableNode.java      |   3 +-
 .../apache/tajo/plan/logical/InsertNode.java    |   7 +-
 .../org/apache/tajo/plan/logical/ScanNode.java  |   7 +-
 .../plan/rewrite/SelfDescSchemaBuildPhase.java  |   4 +-
 .../tajo/plan/rewrite/rules/IndexScanInfo.java  |   3 +-
 .../rewrite/rules/PartitionedTableRewriter.java |   2 +-
 .../plan/serder/LogicalNodeDeserializer.java    |  11 +-
 .../apache/tajo/plan/serder/PlanGsonHelper.java |   7 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |   4 +-
 .../org/apache/tajo/plan/TestLogicalNode.java   |   3 +-
 .../tajo/storage/BaseTupleComparator.java       |   3 +-
 .../org/apache/tajo/storage/MergeScanner.java   |   3 +-
 .../org/apache/tajo/storage/TupleRange.java     |   3 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |   5 +-
 .../tajo/storage/TestTupleComparator.java       |   7 +-
 .../tajo/storage/hbase/TestColumnMapping.java   |   3 +-
 .../apache/tajo/storage/index/bst/BSTIndex.java |   3 +-
 .../tajo/storage/parquet/ParquetScanner.java    |   3 +-
 .../storage/parquet/TajoSchemaConverter.java    |   3 +-
 .../storage/thirdparty/orc/OrcRecordReader.java |   3 +-
 .../tajo/storage/TestCompressionStorages.java   |   3 +-
 .../tajo/storage/TestDelimitedTextFile.java     |   3 +-
 .../apache/tajo/storage/TestFileSystems.java    |   3 +-
 .../apache/tajo/storage/TestFileTablespace.java |   9 +-
 .../org/apache/tajo/storage/TestLineReader.java |   9 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   5 +-
 .../org/apache/tajo/storage/TestStorages.java   |  51 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |  24 +-
 .../index/TestSingleCSVFileBSTIndex.java        |   6 +-
 .../apache/tajo/storage/json/TestJsonSerDe.java |   5 +-
 .../tajo/storage/parquet/TestReadWrite.java     |   3 +-
 .../storage/parquet/TestSchemaConverter.java    |   5 +-
 .../tajo/storage/raw/TestDirectRawFile.java     |   7 +-
 .../storage/jdbc/JdbcMetadataProviderBase.java  |   2 +-
 134 files changed, 1302 insertions(+), 1119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3c6bcaf..971d8d8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2099: Implement an Adapter for the legacy Schema. (hyunsik)
+
     TAJO-2091: Error or progress update should use stderr instead of stdout. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 94e8157..76990f2 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -351,7 +351,7 @@ public class CatalogUtil {
   * @return
   */
   public static SchemaProto getQualfiedSchema(String tableName, SchemaProto schema) {
-    Schema restored = new Schema(schema);
+    Schema restored = SchemaFactory.newV1(schema);
     restored.setQualifier(tableName);
     return restored.getProto();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
index 9f64913..ad038e8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
@@ -58,7 +58,7 @@ public class IndexDesc implements ProtoObject<IndexDescProto>, Cloneable {
           proto.getIndexName(), new URI(proto.getIndexPath()),
           keySortSpecs,
           proto.getIndexMethod(), proto.getIsUnique(), proto.getIsClustered(),
-          new Schema(proto.getTargetRelationSchema()));
+          SchemaFactory.newV1(proto.getTargetRelationSchema()));
     } catch (URISyntaxException e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 9574f12..1b4c1eb 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -18,113 +18,16 @@
 
 package org.apache.tajo.catalog;
 
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.SchemaUtil.ColumnVisitor;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.exception.DuplicateColumnException;
-import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.json.GsonObject;
-import org.apache.tajo.util.StringUtils;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
 
-public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
-
-	@Expose protected List<Column> fields = null;
-	@Expose protected Map<String, Integer> fieldsByQualifiedName = null;
-  @Expose protected Map<String, List<Integer>> fieldsByName = null;
-
-	public Schema() {
-    init();
-	}
-
-  /**
-   * This Schema constructor restores a serialized schema into in-memory Schema structure.
-   * A serialized schema is an ordered list in depth-first order over a nested schema.
-   * This constructor transforms the list into a tree-like structure.
-   *
-   * @param proto
-   */
-	public Schema(SchemaProto proto) {
-    init();
-
-    List<Column> toBeAdded = new ArrayList<>();
-    for (int i = 0; i < proto.getFieldsCount(); i++) {
-      deserializeColumn(toBeAdded, proto.getFieldsList(), i);
-    }
-
-    for (Column c : toBeAdded) {
-      addColumn(c);
-    }
-  }
-
-  /**
-   * This method transforms a list of ColumnProtos into a schema tree.
-   * It assumes that <code>protos</code> contains a list of ColumnProtos in the depth-first order.
-   *
-   * @param tobeAdded
-   * @param protos
-   * @param serializedColumnIndex
-   */
-  private static void deserializeColumn(List<Column> tobeAdded, List<ColumnProto> protos, int serializedColumnIndex) {
-    ColumnProto columnProto = protos.get(serializedColumnIndex);
-    if (columnProto.getDataType().getType() == Type.RECORD) {
-
-      // Get the number of child fields
-      int childNum = columnProto.getDataType().getNumNestedFields();
-      // where is start index of nested fields?
-      int childStartIndex = tobeAdded.size() - childNum;
-      // Extract nested fields
-      List<Column> nestedColumns = new ArrayList<>(tobeAdded.subList(childStartIndex, childStartIndex + childNum));
-
-      // Remove nested fields from the the current level
-      for (int i = 0; i < childNum; i++) {
-        tobeAdded.remove(tobeAdded.size() - 1);
-      }
-
-      // Add the nested fields to the list as a single record column
-      tobeAdded.add(new Column(columnProto.getName(), new TypeDesc(new Schema(nestedColumns))));
-    } else {
-      tobeAdded.add(new Column(protos.get(serializedColumnIndex)));
-    }
-  }
-
-	public Schema(Schema schema) {
-	  this();
-
-		this.fields.addAll(schema.fields);
-		this.fieldsByQualifiedName.putAll(schema.fieldsByQualifiedName);
-    this.fieldsByName.putAll(schema.fieldsByName);
-	}
-
-  public Schema(Column [] columns) {
-    init();
-
-    for(Column c : columns) {
-      addColumn(c);
-    }
-  }
-
-	public Schema(Iterable<Column> columns) {
-    init();
-
-    for(Column c : columns) {
-      addColumn(c);
-    }
-  }
-
-  private void init() {
-    this.fields = new ArrayList<>();
-    this.fieldsByQualifiedName = new HashMap<>();
-    this.fieldsByName = new HashMap<>();
-  }
+public interface Schema extends ProtoObject<SchemaProto>, Cloneable, GsonObject {
 
   /**
    * Set a qualifier to this schema.
@@ -132,45 +35,15 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
    *
    * @param qualifier The qualifier
    */
-  public void setQualifier(String qualifier) {
-    // only change root fields, and must keep each nested field simple name
-    List<Column> columns = getRootColumns();
-
-    fields.clear();
-    fieldsByQualifiedName.clear();
-    fieldsByName.clear();
-
-    Column newColumn;
-    for (Column c : columns) {
-      newColumn = new Column(qualifier + "." + c.getSimpleName(), c.typeDesc);
-      addColumn(newColumn);
-    }
-  }
+  void setQualifier(String qualifier);
 	
-	public int size() {
-		return this.fields.size();
-	}
+	int size();
 
-  public Column getColumn(int id) {
-    return fields.get(id);
-  }
+  Column getColumn(int id);
 
-  public Column getColumn(Column column) {
-    int idx = getIndex(column);
-    return idx >= 0 ? fields.get(idx) : null;
-  }
+  Column getColumn(Column column);
 
-  public int getIndex(Column column) {
-    if (!contains(column)) {
-      return -1;
-    }
-
-    if (column.hasQualifier()) {
-      return fieldsByQualifiedName.get(column.getQualifiedName());
-    } else {
-      return fieldsByName.get(column.getSimpleName()).get(0);
-    }
-  }
+  int getIndex(Column column);
 
   /**
    * Get a column by a given name.
@@ -178,224 +51,35 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
    * @param name The column name to be found.
    * @return The column matched to a given column name.
    */
-  public Column getColumn(String name) {
-
-    if (NestedPathUtil.isPath(name)) {
-
-      // TODO - to be refactored
-      if (fieldsByQualifiedName.containsKey(name)) {
-        Column flattenColumn = fields.get(fieldsByQualifiedName.get(name));
-        if (flattenColumn != null) {
-          return flattenColumn;
-        }
-      }
-
-      String [] paths = name.split(NestedPathUtil.PATH_DELIMITER);
-      Column column = getColumn(paths[0]);
-      if (column == null) {
-        return null;
-      }
-      Column actualColumn = NestedPathUtil.lookupPath(column, paths);
+  Column getColumn(String name);
 
-      Column columnPath = new Column(
-          column.getQualifiedName() + NestedPathUtil.makePath(paths, 1),
-          actualColumn.typeDesc);
+	int getColumnId(String name);
 
-      return columnPath;
-    } else {
-      String[] parts = name.split("\\.");
-      // Some of the string can includes database name and table name and column name.
-      // For example, it can be 'default.table1.id'.
-      // Therefore, spilt string array length can be 3.
-      if (parts.length >= 2) {
-        return getColumnByQName(name);
-      } else {
-        return getColumnByName(name);
-      }
-    }
-  }
-
-  /**
-   * Find a column by a qualified name (e.g., table1.col1).
-   *
-   * @param qualifiedName The qualified name
-   * @return The Column matched to a given qualified name
-   */
-  private Column getColumnByQName(String qualifiedName) {
-		Integer cid = fieldsByQualifiedName.get(qualifiedName);
-		return cid != null ? fields.get(cid) : null;
-	}
-
-  /**
-   * Find a column by a name (e.g., col1).
-   * The same name columns can be exist in a schema. For example, table1.col1 and table2.col1 coexist in a schema.
-   * In this case, it will throw {@link java.lang.RuntimeException}. But, it occurs rarely because all column names
-   * except for alias have a qualified form.
-   *
-   * @param columnName The column name without qualifier
-   * @return The Column matched to a given name.
-   */
-	private Column getColumnByName(String columnName) {
-    String normalized = columnName;
-	  List<Integer> list = fieldsByName.get(normalized);
-
-    if (list == null || list.size() == 0) {
-      return null;
-    }
-
-    if (list.size() == 1) {
-      return fields.get(list.get(0));
-    } else {
-      throw throwAmbiguousFieldException(list);
-    }
-	}
-
-  private RuntimeException throwAmbiguousFieldException(Collection<Integer> idList) {
-    StringBuilder sb = new StringBuilder();
-    boolean first = true;
-    for (Integer id : idList) {
-      if (first) {
-        first = false;
-      } else {
-        sb.append(", ");
-      }
-      sb.append(fields.get(id));
-    }
-    throw new RuntimeException("Ambiguous Column Name Access: " + sb.toString());
-  }
-	
-	public int getColumnId(String name) {
-    // if the same column exists, immediately return that column.
-    if (fieldsByQualifiedName.containsKey(name)) {
-      return fieldsByQualifiedName.get(name);
-    }
-
-    // The following is some workaround code.
-    List<Integer> list = fieldsByName.get(name);
-    if (list == null) {
-      return -1;
-    } else  if (list.size() == 1) {
-      return fieldsByName.get(name).get(0);
-    } else if (list.size() == 0) {
-      return -1;
-    } else { // if list.size > 2
-      throw throwAmbiguousFieldException(list);
-    }
-	}
-
-  public int getColumnIdByName(String colName) {
-    for (Column col : fields) {
-      if (col.getSimpleName().equals(colName)) {
-        String qualifiedName = col.getQualifiedName();
-        return fieldsByQualifiedName.get(qualifiedName);
-      }
-    }
-    return -1;
-  }
+  int getColumnIdByName(String colName);
 
   /**
    * Get root columns, meaning all columns except for nested fields.
    *
    * @return A list of root columns
    */
-	public List<Column> getRootColumns() {
-		return ImmutableList.copyOf(fields);
-	}
+	List<Column> getRootColumns();
 
   /**
    * Get all columns, including all nested fields
    *
    * @return A list of all columns
    */
-  public List<Column> getAllColumns() {
-    final List<Column> columnList = new ArrayList<>();
-
-    SchemaUtil.visitSchema(this, new ColumnVisitor() {
-      @Override
-      public void visit(int depth, List<String> path, Column column) {
-        if (path.size() > 0) {
-          String parentPath = StringUtils.join(path, NestedPathUtil.PATH_DELIMITER);
-          String currentPath = parentPath + NestedPathUtil.PATH_DELIMITER + column.getSimpleName();
-          columnList.add(new Column(currentPath, column.getTypeDesc()));
-        } else {
-          columnList.add(column);
-        }
-      }
-    });
-
-    return columnList;
-  }
-
-  public boolean contains(String name) {
-    // TODO - It's a hack
-    if (NestedPathUtil.isPath(name)) {
-      return (getColumn(name) != null);
-    }
-
-    if (fieldsByQualifiedName.containsKey(name)) {
-      return true;
-    }
-    if (fieldsByName.containsKey(name)) {
-      if (fieldsByName.get(name).size() > 1) {
-        throw new RuntimeException("Ambiguous Column name");
-      }
-      return true;
-    }
+  List<Column> getAllColumns();
 
-    return false;
-  }
+  boolean contains(String name);
 
-  public boolean contains(Column column) {
-    // TODO - It's a hack
-    if (NestedPathUtil.isPath(column.getQualifiedName())) {
-      return (getColumn(column.getQualifiedName()) != null);
-    }
-
-    if (column.hasQualifier()) {
-      return fieldsByQualifiedName.containsKey(column.getQualifiedName());
-    } else {
-      if (fieldsByName.containsKey(column.getSimpleName())) {
-        int num = fieldsByName.get(column.getSimpleName()).size();
-        if (num == 0) {
-          throw new IllegalStateException("No such column name: " + column.getSimpleName());
-        }
-        if (num > 1) {
-          throw new RuntimeException("Ambiguous column name: " + column.getSimpleName());
-        }
-        return true;
-      }
-      return false;
-    }
-  }
+  boolean contains(Column column);
 	
-	public boolean containsByQualifiedName(String qualifiedName) {
-		return fieldsByQualifiedName.containsKey(qualifiedName);
-	}
-
-  public boolean containsByName(String colName) {
-    return fieldsByName.containsKey(colName);
-  }
+	boolean containsByQualifiedName(String qualifiedName);
 
-  public boolean containsAll(Collection<Column> columns) {
-    boolean containFlag = true;
+  boolean containsByName(String colName);
 
-    for (Column c :columns) {
-      if (NestedPathUtil.isPath(c.getSimpleName())) {
-        if (contains(c.getQualifiedName())) {
-          containFlag &= true;
-        } else {
-          String[] paths = c.getQualifiedName().split("/");
-          boolean existRootPath = contains(paths[0]);
-          boolean existLeafPath = getColumn(c.getSimpleName()) != null;
-          containFlag &= existRootPath && existLeafPath;
-        }
-      } else {
-        containFlag &= fields.contains(c);
-      }
-    }
-
-    return containFlag;
-  }
+  boolean containsAll(Collection<Column> columns);
 
   /**
    * Return TRUE if any column in <code>columns</code> is included in this schema.
@@ -404,133 +88,34 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
    * @return true if any column in <code>columns</code> is included in this schema.
    *         Otherwise, false.
    */
-  public boolean containsAny(Collection<Column> columns) {
-    for (Column column : columns) {
-      if (contains(column)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public synchronized Schema addColumn(String name, TypeDesc typeDesc) {
-    String normalized = name;
-    if(fieldsByQualifiedName.containsKey(normalized)) {
-      throw new TajoRuntimeException(new DuplicateColumnException(normalized));
-    }
+  boolean containsAny(Collection<Column> columns);
 
-    Column newCol = new Column(normalized, typeDesc);
-    fields.add(newCol);
-    fieldsByQualifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
-    List<Integer> inputList = new ArrayList<>();
-    inputList.add(fields.size() - 1);
-    fieldsByName.put(newCol.getSimpleName(), inputList);
+  Schema addColumn(String name, TypeDesc typeDesc);
 
-    return this;
-  }
+  Schema addColumn(String name, Type type);
 
-  public synchronized Schema addColumn(String name, Type type) {
-    return addColumn(name, CatalogUtil.newSimpleDataType(type));
-  }
+  Schema addColumn(String name, Type type, int length);
 
-  public synchronized Schema addColumn(String name, Type type, int length) {
-    return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
-  }
-
-  public synchronized Schema addColumn(String name, DataType dataType) {
-		addColumn(name, new TypeDesc(dataType));
-
-		return this;
-	}
+  Schema addColumn(String name, DataType dataType);
 	
-	public synchronized void addColumn(Column column) {
-		addColumn(column.getQualifiedName(), column.typeDesc);
-	}
+	void addColumn(Column column);
 	
-	public synchronized void addColumns(Schema schema) {
-    for(Column column : schema.getRootColumns()) {
-      addColumn(column);
-    }
-  }
+	void addColumns(Schema schema);
 
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(fields, fieldsByQualifiedName, fieldsByName);
-  }
 
   @Override
-	public boolean equals(Object o) {
-		if (o instanceof Schema) {
-		  Schema other = (Schema) o;
-		  return getProto().equals(other.getProto());
-		}
-		return false;
-	}
-	
-  @Override
-  public Object clone() throws CloneNotSupportedException {
-    Schema schema = (Schema) super.clone();
-    schema.init();
+	boolean equals(Object o);
 
-    for(Column column: this.fields) {
-      schema.addColumn(column);
-    }
-    return schema;
-  }
+  Object clone() throws CloneNotSupportedException;
 
 	@Override
-	public SchemaProto getProto() {
-    SchemaProto.Builder builder = SchemaProto.newBuilder();
-    SchemaProtoBuilder recursiveBuilder = new SchemaProtoBuilder(builder);
-    SchemaUtil.visitSchema(this, recursiveBuilder);
-    return builder.build();
-	}
-
-  private static class SchemaProtoBuilder implements ColumnVisitor {
-    private SchemaProto.Builder builder;
-    public SchemaProtoBuilder(SchemaProto.Builder builder) {
-      this.builder = builder;
-    }
-
-    @Override
-    public void visit(int depth, List<String> path, Column column) {
-
-      if (column.getDataType().getType() == Type.RECORD) {
-        DataType.Builder updatedType = DataType.newBuilder(column.getDataType());
-        updatedType.setNumNestedFields(column.typeDesc.nestedRecordSchema.size());
+	SchemaProto getProto();
 
-        ColumnProto.Builder updatedColumn = ColumnProto.newBuilder(column.getProto());
-        updatedColumn.setDataType(updatedType);
-
-        builder.addFields(updatedColumn.build());
-      } else {
-        builder.addFields(column.getProto());
-      }
-    }
-  }
-
-	public String toString() {
-	  StringBuilder sb = new StringBuilder();
-	  sb.append("{(").append(size()).append(") ");
-	  int i = 0;
-	  for(Column col : fields) {
-	    sb.append(col);
-	    if (i < fields.size() - 1) {
-	      sb.append(", ");
-	    }
-	    i++;
-	  }
-	  sb.append("}");
-	  
-	  return sb.toString();
-	}
+  @Override
+	String toString();
 
   @Override
-	public String toJson() {
-	  return CatalogGsonHelper.toJson(this, Schema.class);
-	}
+	String toJson();
 
-  public Column [] toArray() {
-    return this.fields.toArray(new Column[this.fields.size()]);
-  }
+  Column [] toArray();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaFactory.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaFactory.java
new file mode 100644
index 0000000..fb6910b
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.TajoInternalError;
+
+public class SchemaFactory {
+  public static Schema newV1() {
+    return new SchemaLegacy();
+  }
+
+  public static Schema newV1(CatalogProtos.SchemaProto proto) {
+    return new SchemaLegacy(proto);
+  }
+
+  public static Schema newV1(Schema schema) {
+    try {
+      return (Schema) schema.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new TajoInternalError(e);
+    }
+  }
+
+  public static Schema newV1(Column [] columns) {
+    return new SchemaLegacy(columns);
+  }
+
+  public static Schema newV1(Iterable<Column> columns) {
+    return new SchemaLegacy(columns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaLegacy.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaLegacy.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaLegacy.java
new file mode 100644
index 0000000..f23d519
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaLegacy.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.SchemaUtil.ColumnVisitor;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.DuplicateColumnException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.StringUtils;
+
+import java.util.*;
+
+public class SchemaLegacy implements Schema, ProtoObject<SchemaProto>, Cloneable, GsonObject {
+
+	@Expose protected List<Column> fields = null;
+	@Expose protected Map<String, Integer> fieldsByQualifiedName = null;
+  @Expose protected Map<String, List<Integer>> fieldsByName = null;
+
+	public SchemaLegacy() {
+    init();
+	}
+
+  /**
+   * This Schema constructor restores a serialized schema into in-memory Schema structure.
+   * A serialized schema is an ordered list in depth-first order over a nested schema.
+   * This constructor transforms the list into a tree-like structure.
+   *
+   * @param proto
+   */
+	public SchemaLegacy(SchemaProto proto) {
+    init();
+
+    List<Column> toBeAdded = new ArrayList<>();
+    for (int i = 0; i < proto.getFieldsCount(); i++) {
+      deserializeColumn(toBeAdded, proto.getFieldsList(), i);
+    }
+
+    for (Column c : toBeAdded) {
+      addColumn(c);
+    }
+  }
+
+  /**
+   * This method transforms a list of ColumnProtos into a schema tree.
+   * It assumes that <code>protos</code> contains a list of ColumnProtos in the depth-first order.
+   *
+   * @param tobeAdded
+   * @param protos
+   * @param serializedColumnIndex
+   */
+  private static void deserializeColumn(List<Column> tobeAdded, List<ColumnProto> protos, int serializedColumnIndex) {
+    ColumnProto columnProto = protos.get(serializedColumnIndex);
+    if (columnProto.getDataType().getType() == Type.RECORD) {
+
+      // Get the number of child fields
+      int childNum = columnProto.getDataType().getNumNestedFields();
+      // where is start index of nested fields?
+      int childStartIndex = tobeAdded.size() - childNum;
+      // Extract nested fields
+      List<Column> nestedColumns = new ArrayList<>(tobeAdded.subList(childStartIndex, childStartIndex + childNum));
+
+      // Remove nested fields from the the current level
+      for (int i = 0; i < childNum; i++) {
+        tobeAdded.remove(tobeAdded.size() - 1);
+      }
+
+      // Add the nested fields to the list as a single record column
+      tobeAdded.add(new Column(columnProto.getName(), new TypeDesc(new SchemaLegacy(nestedColumns))));
+    } else {
+      tobeAdded.add(new Column(protos.get(serializedColumnIndex)));
+    }
+  }
+
+	public SchemaLegacy(Schema schema) {
+	  new SchemaLegacy(schema.getRootColumns());
+	}
+
+  public SchemaLegacy(Column [] columns) {
+    init();
+
+    for(Column c : columns) {
+      addColumn(c);
+    }
+  }
+
+	public SchemaLegacy(Iterable<Column> columns) {
+    init();
+
+    for(Column c : columns) {
+      addColumn(c);
+    }
+  }
+
+  private void init() {
+    this.fields = new ArrayList<>();
+    this.fieldsByQualifiedName = new HashMap<>();
+    this.fieldsByName = new HashMap<>();
+  }
+
+  /**
+   * Set a qualifier to this schema.
+   * This changes the qualifier of all columns except for not-qualified columns.
+   *
+   * @param qualifier The qualifier
+   */
+  @Override
+  public void setQualifier(String qualifier) {
+    // only change root fields, and must keep each nested field simple name
+    List<Column> columns = getRootColumns();
+
+    fields.clear();
+    fieldsByQualifiedName.clear();
+    fieldsByName.clear();
+
+    Column newColumn;
+    for (Column c : columns) {
+      newColumn = new Column(qualifier + "." + c.getSimpleName(), c.typeDesc);
+      addColumn(newColumn);
+    }
+  }
+
+  @Override
+	public int size() {
+		return this.fields.size();
+	}
+
+  @Override
+  public Column getColumn(int id) {
+    return fields.get(id);
+  }
+
+  @Override
+  public Column getColumn(Column column) {
+    int idx = getIndex(column);
+    return idx >= 0 ? fields.get(idx) : null;
+  }
+
+  public int getIndex(Column column) {
+    if (!contains(column)) {
+      return -1;
+    }
+
+    if (column.hasQualifier()) {
+      return fieldsByQualifiedName.get(column.getQualifiedName());
+    } else {
+      return fieldsByName.get(column.getSimpleName()).get(0);
+    }
+  }
+
+  /**
+   * Get a column by a given name.
+   *
+   * @param name The column name to be found.
+   * @return The column matched to a given column name.
+   */
+  @Override
+  public Column getColumn(String name) {
+
+    if (NestedPathUtil.isPath(name)) {
+
+      // TODO - to be refactored
+      if (fieldsByQualifiedName.containsKey(name)) {
+        Column flattenColumn = fields.get(fieldsByQualifiedName.get(name));
+        if (flattenColumn != null) {
+          return flattenColumn;
+        }
+      }
+
+      String [] paths = name.split(NestedPathUtil.PATH_DELIMITER);
+      Column column = getColumn(paths[0]);
+      if (column == null) {
+        return null;
+      }
+      Column actualColumn = NestedPathUtil.lookupPath(column, paths);
+
+      Column columnPath = new Column(
+          column.getQualifiedName() + NestedPathUtil.makePath(paths, 1),
+          actualColumn.typeDesc);
+
+      return columnPath;
+    } else {
+      String[] parts = name.split("\\.");
+      // Some of the string can includes database name and table name and column name.
+      // For example, it can be 'default.table1.id'.
+      // Therefore, spilt string array length can be 3.
+      if (parts.length >= 2) {
+        return getColumnByQName(name);
+      } else {
+        return getColumnByName(name);
+      }
+    }
+  }
+
+  /**
+   * Find a column by a qualified name (e.g., table1.col1).
+   *
+   * @param qualifiedName The qualified name
+   * @return The Column matched to a given qualified name
+   */
+  private Column getColumnByQName(String qualifiedName) {
+		Integer cid = fieldsByQualifiedName.get(qualifiedName);
+		return cid != null ? fields.get(cid) : null;
+	}
+
+  /**
+   * Find a column by a name (e.g., col1).
+   * The same name columns can be exist in a schema. For example, table1.col1 and table2.col1 coexist in a schema.
+   * In this case, it will throw {@link RuntimeException}. But, it occurs rarely because all column names
+   * except for alias have a qualified form.
+   *
+   * @param columnName The column name without qualifier
+   * @return The Column matched to a given name.
+   */
+	private Column getColumnByName(String columnName) {
+    String normalized = columnName;
+	  List<Integer> list = fieldsByName.get(normalized);
+
+    if (list == null || list.size() == 0) {
+      return null;
+    }
+
+    if (list.size() == 1) {
+      return fields.get(list.get(0));
+    } else {
+      throw throwAmbiguousFieldException(list);
+    }
+	}
+
+  private RuntimeException throwAmbiguousFieldException(Collection<Integer> idList) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (Integer id : idList) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append(fields.get(id));
+    }
+    throw new RuntimeException("Ambiguous Column Name Access: " + sb.toString());
+  }
+
+  @Override
+	public int getColumnId(String name) {
+    // if the same column exists, immediately return that column.
+    if (fieldsByQualifiedName.containsKey(name)) {
+      return fieldsByQualifiedName.get(name);
+    }
+
+    // The following is some workaround code.
+    List<Integer> list = fieldsByName.get(name);
+    if (list == null) {
+      return -1;
+    } else  if (list.size() == 1) {
+      return fieldsByName.get(name).get(0);
+    } else if (list.size() == 0) {
+      return -1;
+    } else { // if list.size > 2
+      throw throwAmbiguousFieldException(list);
+    }
+	}
+
+  @Override
+  public int getColumnIdByName(String colName) {
+    for (Column col : fields) {
+      if (col.getSimpleName().equals(colName)) {
+        String qualifiedName = col.getQualifiedName();
+        return fieldsByQualifiedName.get(qualifiedName);
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Get root columns, meaning all columns except for nested fields.
+   *
+   * @return A list of root columns
+   */
+  @Override
+	public List<Column> getRootColumns() {
+		return ImmutableList.copyOf(fields);
+	}
+
+  /**
+   * Get all columns, including all nested fields
+   *
+   * @return A list of all columns
+   */
+  @Override
+  public List<Column> getAllColumns() {
+    final List<Column> columnList = new ArrayList<>();
+
+    SchemaUtil.visitSchema(this, new ColumnVisitor() {
+      @Override
+      public void visit(int depth, List<String> path, Column column) {
+        if (path.size() > 0) {
+          String parentPath = StringUtils.join(path, NestedPathUtil.PATH_DELIMITER);
+          String currentPath = parentPath + NestedPathUtil.PATH_DELIMITER + column.getSimpleName();
+          columnList.add(new Column(currentPath, column.getTypeDesc()));
+        } else {
+          columnList.add(column);
+        }
+      }
+    });
+
+    return columnList;
+  }
+
+  @Override
+  public boolean contains(String name) {
+    // TODO - It's a hack
+    if (NestedPathUtil.isPath(name)) {
+      return (getColumn(name) != null);
+    }
+
+    if (fieldsByQualifiedName.containsKey(name)) {
+      return true;
+    }
+    if (fieldsByName.containsKey(name)) {
+      if (fieldsByName.get(name).size() > 1) {
+        throw new RuntimeException("Ambiguous Column name");
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean contains(Column column) {
+    // TODO - It's a hack
+    if (NestedPathUtil.isPath(column.getQualifiedName())) {
+      return (getColumn(column.getQualifiedName()) != null);
+    }
+
+    if (column.hasQualifier()) {
+      return fieldsByQualifiedName.containsKey(column.getQualifiedName());
+    } else {
+      if (fieldsByName.containsKey(column.getSimpleName())) {
+        int num = fieldsByName.get(column.getSimpleName()).size();
+        if (num == 0) {
+          throw new IllegalStateException("No such column name: " + column.getSimpleName());
+        }
+        if (num > 1) {
+          throw new RuntimeException("Ambiguous column name: " + column.getSimpleName());
+        }
+        return true;
+      }
+      return false;
+    }
+  }
+
+  @Override
+	public boolean containsByQualifiedName(String qualifiedName) {
+		return fieldsByQualifiedName.containsKey(qualifiedName);
+	}
+
+  @Override
+  public boolean containsByName(String colName) {
+    return fieldsByName.containsKey(colName);
+  }
+
+  @Override
+  public boolean containsAll(Collection<Column> columns) {
+    boolean containFlag = true;
+
+    for (Column c :columns) {
+      if (NestedPathUtil.isPath(c.getSimpleName())) {
+        if (contains(c.getQualifiedName())) {
+          containFlag &= true;
+        } else {
+          String[] paths = c.getQualifiedName().split("/");
+          boolean existRootPath = contains(paths[0]);
+          boolean existLeafPath = getColumn(c.getSimpleName()) != null;
+          containFlag &= existRootPath && existLeafPath;
+        }
+      } else {
+        containFlag &= fields.contains(c);
+      }
+    }
+
+    return containFlag;
+  }
+
+  /**
+   * Return TRUE if any column in <code>columns</code> is included in this schema.
+   *
+   * @param columns Columns to be checked
+   * @return true if any column in <code>columns</code> is included in this schema.
+   *         Otherwise, false.
+   */
+  @Override
+  public boolean containsAny(Collection<Column> columns) {
+    for (Column column : columns) {
+      if (contains(column)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized SchemaLegacy addColumn(String name, TypeDesc typeDesc) {
+    String normalized = name;
+    if(fieldsByQualifiedName.containsKey(normalized)) {
+      throw new TajoRuntimeException(new DuplicateColumnException(normalized));
+    }
+
+    Column newCol = new Column(normalized, typeDesc);
+    fields.add(newCol);
+    fieldsByQualifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
+    List<Integer> inputList = new ArrayList<>();
+    inputList.add(fields.size() - 1);
+    fieldsByName.put(newCol.getSimpleName(), inputList);
+
+    return this;
+  }
+
+  @Override
+  public synchronized SchemaLegacy addColumn(String name, Type type) {
+    return addColumn(name, CatalogUtil.newSimpleDataType(type));
+  }
+
+  @Override
+  public synchronized SchemaLegacy addColumn(String name, Type type, int length) {
+    return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
+  }
+
+  @Override
+  public synchronized SchemaLegacy addColumn(String name, DataType dataType) {
+		addColumn(name, new TypeDesc(dataType));
+
+		return this;
+	}
+
+  @Override
+	public synchronized void addColumn(Column column) {
+		addColumn(column.getQualifiedName(), column.typeDesc);
+	}
+
+  @Override
+	public synchronized void addColumns(Schema schema) {
+    for(Column column : schema.getRootColumns()) {
+      addColumn(column);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(fields, fieldsByQualifiedName, fieldsByName);
+  }
+
+  @Override
+	public boolean equals(Object o) {
+		if (o instanceof SchemaLegacy) {
+		  SchemaLegacy other = (SchemaLegacy) o;
+		  return getProto().equals(other.getProto());
+		}
+		return false;
+	}
+	
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SchemaLegacy schema = (SchemaLegacy) super.clone();
+    schema.init();
+
+    for(Column column: this.fields) {
+      schema.addColumn(column);
+    }
+    return schema;
+  }
+
+	@Override
+	public SchemaProto getProto() {
+    SchemaProto.Builder builder = SchemaProto.newBuilder();
+    SchemaProtoBuilder recursiveBuilder = new SchemaProtoBuilder(builder);
+    SchemaUtil.visitSchema(this, recursiveBuilder);
+    return builder.build();
+	}
+
+  private static class SchemaProtoBuilder implements ColumnVisitor {
+    private SchemaProto.Builder builder;
+    public SchemaProtoBuilder(SchemaProto.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void visit(int depth, List<String> path, Column column) {
+
+      if (column.getDataType().getType() == Type.RECORD) {
+        DataType.Builder updatedType = DataType.newBuilder(column.getDataType());
+        updatedType.setNumNestedFields(column.typeDesc.nestedRecordSchema.size());
+
+        ColumnProto.Builder updatedColumn = ColumnProto.newBuilder(column.getProto());
+        updatedColumn.setDataType(updatedType);
+
+        builder.addFields(updatedColumn.build());
+      } else {
+        builder.addFields(column.getProto());
+      }
+    }
+  }
+
+  @Override
+	public String toString() {
+	  StringBuilder sb = new StringBuilder();
+	  sb.append("{(").append(size()).append(") ");
+	  int i = 0;
+	  for(Column col : fields) {
+	    sb.append(col);
+	    if (i < fields.size() - 1) {
+	      sb.append(", ");
+	    }
+	    i++;
+	  }
+	  sb.append("}");
+	  
+	  return sb.toString();
+	}
+
+  @Override
+	public String toJson() {
+	  return CatalogGsonHelper.toJson(this, SchemaLegacy.class);
+	}
+
+  @Override
+  public Column [] toArray() {
+    return this.fields.toArray(new Column[this.fields.size()]);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
index 0ffe584..0c62ae5 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
@@ -42,7 +42,7 @@ public class SchemaUtil {
   // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
   static int tmpColumnSeq = 0;
   public static Schema merge(Schema left, Schema right) {
-    Schema merged = new Schema();
+    Schema merged = SchemaFactory.newV1();
     for(Column col : left.getRootColumns()) {
       if (!merged.containsByQualifiedName(col.getQualifiedName())) {
         merged.addColumn(col);
@@ -67,7 +67,7 @@ public class SchemaUtil {
    * Get common columns to be used as join keys of natural joins.
    */
   public static Schema getNaturalJoinColumns(Schema left, Schema right) {
-    Schema common = new Schema();
+    Schema common = SchemaFactory.newV1();
     for (Column outer : left.getRootColumns()) {
       if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
         common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
@@ -78,7 +78,7 @@ public class SchemaUtil {
   }
 
   public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
-    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    Schema logicalSchema = SchemaFactory.newV1(tableDesc.getLogicalSchema());
     if (tableName != null) {
       logicalSchema.setQualifier(tableName);
     }
@@ -208,7 +208,7 @@ public class SchemaUtil {
    */
   public static int estimateRowByteSizeWithSchema(Schema schema) {
     int size = 0;
-    for (Column column : schema.fields) {
+    for (Column column : schema.getAllColumns()) {
       size += getColByteSize(column);
     }
     return size;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index 3cdc00b..392a83d 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -70,7 +70,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	}
 	
 	public TableDesc(TableDescProto proto) {
-	  this(proto.getTableName(), proto.hasSchema() ? new Schema(proto.getSchema()) : null,
+	  this(proto.getTableName(), proto.hasSchema() ? SchemaFactory.newV1(proto.getSchema()) : null,
         new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, proto.getIsExternal());
     if(proto.hasStats()) {
       this.stats = new TableStats(proto.getStats());
@@ -122,7 +122,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 
   public Schema getLogicalSchema() {
     if (hasPartition()) {
-      Schema logicalSchema = new Schema(schema);
+      Schema logicalSchema = SchemaFactory.newV1(schema);
       logicalSchema.addColumns(getPartitionMethod().getExpressionSchema());
       logicalSchema.setQualifier(tableName);
       return logicalSchema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/CatalogGsonHelper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/CatalogGsonHelper.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/CatalogGsonHelper.java
index ec439f0..c145ecd 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/CatalogGsonHelper.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/CatalogGsonHelper.java
@@ -21,10 +21,11 @@ package org.apache.tajo.catalog.json;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.function.Function;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.function.Function;
 import org.apache.tajo.json.*;
 
 import java.lang.reflect.Type;
@@ -38,14 +39,15 @@ public class CatalogGsonHelper {
   private CatalogGsonHelper() {
   }
 
-  private static Map<Type, GsonSerDerAdapter> registerAdapters() {
-    Map<Type, GsonSerDerAdapter> adapters = new HashMap<>();
+  private static Map<Type, GsonSerDerAdapter<?>> registerAdapters() {
+    Map<Type, GsonSerDerAdapter<?>> adapters = new HashMap<>();
     adapters.put(Class.class, new ClassNameSerializer());
     adapters.put(Path.class, new PathSerializer());
     adapters.put(TableMeta.class, new TableMetaAdapter());
     adapters.put(Function.class, new FunctionAdapter());
     adapters.put(Datum.class, new DatumAdapter());
     adapters.put(DataType.class, new DataTypeAdapter());
+    adapters.put(Schema.class, new SchemaAdapter());
     return adapters;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/SchemaAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/SchemaAdapter.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/SchemaAdapter.java
new file mode 100644
index 0000000..f7c2392
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/SchemaAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.json;
+
+import com.google.gson.*;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaLegacy;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.function.Function;
+import org.apache.tajo.json.CommonGsonHelper;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class SchemaAdapter implements GsonSerDerAdapter<Schema> {
+
+  @Override
+  public JsonElement serialize(Schema src, Type typeOfSrc,
+      JsonSerializationContext context) {
+    JsonObject jsonObj = new JsonObject();
+    jsonObj.addProperty("version", src instanceof SchemaLegacy ? "1" : "2");
+    JsonElement jsonElem = context.serialize(src);
+    jsonObj.add("body", jsonElem);
+    return jsonObj;
+  }
+
+  @Override
+  public Schema deserialize(JsonElement json, Type typeOfT,
+      JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObject = json.getAsJsonObject();
+    int version = CommonGsonHelper.getOrDie(jsonObject, "version").getAsJsonPrimitive().getAsInt();
+    
+    if (version == 1) {
+      return context.deserialize(CommonGsonHelper.getOrDie(jsonObject, "body"), SchemaLegacy.class);
+    } else {
+      throw new TajoInternalError("Schema version 2 is not supported yet");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
index 721a7a0..d3f10ad 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
@@ -23,6 +23,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaFactory;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
@@ -59,7 +60,7 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
     this(proto.getTableIdentifier().getDatabaseName(),
         proto.getTableIdentifier().getTableName(),
         proto.getPartitionType(), proto.getExpression(),
-        new Schema(proto.getExpressionSchema()));
+        SchemaFactory.newV1(proto.getExpressionSchema()));
   }
 
   public String getTableName() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
index 7561dfd..0b7516e 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
@@ -40,7 +40,7 @@ public class TestIndexDesc {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    relationSchema = new Schema(new Column[]{new Column("id", Type.INT4),
+    relationSchema = SchemaFactory.newV1(new Column[]{new Column("id", Type.INT4),
         new Column("score", Type.FLOAT8), new Column("name", Type.TEXT)});
     SortSpec[] colSpecs1 = new SortSpec[1];
     colSpecs1[0] = new SortSpec(new Column("id", Type.INT4), true, true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
index c4092f0..6235945 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
@@ -40,10 +40,10 @@ public class TestSchema {
 
   static {
     // simple nested schema
-    nestedSchema1 = new Schema();
+    nestedSchema1 = SchemaFactory.newV1();
     nestedSchema1.addColumn("s1", Type.INT8);
 
-    Schema nestedRecordSchema = new Schema();
+    Schema nestedRecordSchema = SchemaFactory.newV1();
     nestedRecordSchema.addColumn("s2", Type.FLOAT4);
     nestedRecordSchema.addColumn("s3", Type.TEXT);
 
@@ -63,10 +63,10 @@ public class TestSchema {
     //  |- s8
     //     |- s6
     //     |- s7
-    nestedSchema2 = new Schema();
+    nestedSchema2 = SchemaFactory.newV1();
     nestedSchema2.addColumn("s1", Type.INT8);
 
-    Schema nestedRecordSchema1 = new Schema();
+    Schema nestedRecordSchema1 = SchemaFactory.newV1();
     nestedRecordSchema1.addColumn("s2", Type.FLOAT4);
     nestedRecordSchema1.addColumn("s3", Type.TEXT);
 
@@ -75,7 +75,7 @@ public class TestSchema {
 
     nestedSchema2.addColumn("s5", Type.FLOAT8);
 
-    Schema nestedRecordSchema2 = new Schema();
+    Schema nestedRecordSchema2 = SchemaFactory.newV1();
     nestedRecordSchema2.addColumn("s6", Type.FLOAT4);
     nestedRecordSchema2.addColumn("s7", Type.TEXT);
 
@@ -95,18 +95,18 @@ public class TestSchema {
     //      |- s8
     //  |- s9
 
-    nestedSchema3 = new Schema();
+    nestedSchema3 = SchemaFactory.newV1();
     nestedSchema3.addColumn("s1", Type.INT8);
 
     nestedSchema3.addColumn("s2", Type.INT8);
 
-    Schema s5 = new Schema();
+    Schema s5 = SchemaFactory.newV1();
     s5.addColumn("s6", Type.INT8);
 
-    Schema s7 = new Schema();
+    Schema s7 = SchemaFactory.newV1();
     s7.addColumn("s5", new TypeDesc(s5));
 
-    Schema s3 = new Schema();
+    Schema s3 = SchemaFactory.newV1();
     s3.addColumn("s4", Type.INT8);
     s3.addColumn("s7", new TypeDesc(s7));
     s3.addColumn("s8", Type.INT8);
@@ -117,7 +117,7 @@ public class TestSchema {
 
 	@Before
 	public void setUp() throws Exception {
-		schema = new Schema();
+		schema = SchemaFactory.newV1();
 		col1 = new Column("name", Type.TEXT);
 		schema.addColumn(col1);
 		col2 = new Column("age", Type.INT4);
@@ -128,14 +128,14 @@ public class TestSchema {
 
 	@Test
 	public final void testSchemaSchema() {
-		Schema schema2 = new Schema(schema);
+		Schema schema2 = SchemaFactory.newV1(schema);
 		
 		assertEquals(schema, schema2);
 	}
 
 	@Test
 	public final void testSchemaSchemaProto() {
-		Schema schema2 = new Schema(schema.getProto());
+		Schema schema2 = SchemaFactory.newV1(schema.getProto());
 		
 		assertEquals(schema, schema2);
 	}
@@ -149,7 +149,7 @@ public class TestSchema {
 
 	@Test
 	public final void testAddField() {
-		Schema schema = new Schema();
+		Schema schema = SchemaFactory.newV1();
 		assertFalse(schema.containsByQualifiedName("studentId"));
 		schema.addColumn("studentId", Type.INT4);
 		assertTrue(schema.containsByQualifiedName("studentId"));
@@ -157,7 +157,7 @@ public class TestSchema {
 
 	@Test
 	public final void testEqualsObject() {
-		Schema schema2 = new Schema();
+		Schema schema2 = SchemaFactory.newV1();
 		schema2.addColumn("name", Type.TEXT);
 		schema2.addColumn("age", Type.INT4);
 		schema2.addColumn("addr", Type.TEXT);
@@ -176,11 +176,11 @@ public class TestSchema {
 	
 	@Test
 	public final void testClone() throws CloneNotSupportedException {
-	  Schema schema = new Schema();
+	  Schema schema = SchemaFactory.newV1();
 	  schema.addColumn("abc", Type.FLOAT8);
 	  schema.addColumn("bbc", Type.FLOAT8);
 	  
-	  Schema schema2 = new Schema(schema.getProto());
+	  Schema schema2 = SchemaFactory.newV1(schema.getProto());
 	  assertEquals(schema.getProto(), schema2.getProto());
 	  assertEquals(schema.getColumn(0), schema2.getColumn(0));
 	  assertEquals(schema.size(), schema2.size());
@@ -193,7 +193,7 @@ public class TestSchema {
 	
 	@Test(expected = TajoRuntimeException.class)
 	public final void testAddExistColumn() {
-    Schema schema = new Schema();
+    Schema schema = SchemaFactory.newV1();
     schema.addColumn("abc", Type.FLOAT8);
     schema.addColumn("bbc", Type.FLOAT8);
     schema.addColumn("abc", Type.INT4);
@@ -201,31 +201,31 @@ public class TestSchema {
 
 	@Test
 	public final void testJson() {
-		Schema schema2 = new Schema(schema.getProto());
+		Schema schema2 = SchemaFactory.newV1(schema.getProto());
 		String json = schema2.toJson();
-		Schema fromJson = CatalogGsonHelper.fromJson(json, Schema.class);
+		Schema fromJson = CatalogGsonHelper.fromJson(json, SchemaLegacy.class);
 		assertEquals(schema2, fromJson);
     assertEquals(schema2.getProto(), fromJson.getProto());
 	}
 
   @Test
   public final void testProto() {
-    Schema schema2 = new Schema(schema.getProto());
+    Schema schema2 = SchemaFactory.newV1(schema.getProto());
     SchemaProto proto = schema2.getProto();
-    Schema fromJson = new Schema(proto);
-    assertEquals(schema2, fromJson);
+    Schema fromProto = SchemaFactory.newV1(proto);
+    assertEquals(schema2, fromProto);
   }
 
   @Test
   public final void testSetQualifier() {
-    Schema schema2 = new Schema(schema.getProto());
+    Schema schema2 = SchemaFactory.newV1(schema.getProto());
     schema2.setQualifier("test1");
     Column column = schema2.getColumn(1);
     assertEquals(1, schema2.getColumnIdByName("age"));
     assertEquals(column, schema2.getColumn("age"));
     assertEquals(column, schema2.getColumn("test1.age"));
 
-    Schema schema3 = new Schema();
+    Schema schema3 = SchemaFactory.newV1();
     schema3.addColumn("tb1.col1", Type.INT4);
     schema3.addColumn("col2", Type.INT4);
     assertEquals("tb1", schema3.getColumn(0).getQualifier());
@@ -267,17 +267,17 @@ public class TestSchema {
 
   @Test
   public void testNestedRecord4() {
-    Schema root = new Schema();
+    Schema root = SchemaFactory.newV1();
 
-    Schema nf2DotNf1 = new Schema();
+    Schema nf2DotNf1 = SchemaFactory.newV1();
     nf2DotNf1.addColumn("f1", Type.INT8);
     nf2DotNf1.addColumn("f2", Type.INT8);
 
-    Schema nf2DotNf2 = new Schema();
+    Schema nf2DotNf2 = SchemaFactory.newV1();
     nf2DotNf2.addColumn("f1", Type.INT8);
     nf2DotNf2.addColumn("f2", Type.INT8);
 
-    Schema nf2 = new Schema();
+    Schema nf2 = SchemaFactory.newV1();
     nf2.addColumn("f1", Type.INT8);
     nf2.addColumn("nf1", new TypeDesc(nf2DotNf1));
     nf2.addColumn("nf2", new TypeDesc(nf2DotNf2));
@@ -295,7 +295,7 @@ public class TestSchema {
     assertEquals(s1, s1);
 
     SchemaProto proto = s1.getProto();
-    assertEquals("Proto (de)serialized schema is different from the original: ", s1, new Schema(proto));
+    assertEquals("Proto (de)serialized schema is different from the original: ", s1, SchemaFactory.newV1(proto));
 
     Schema cloned = null;
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
index b3d343d..f334738 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
@@ -41,7 +41,7 @@ public class TestTableDesc {
 	
 	@Before
 	public void setup() throws IOException {
-	  schema = new Schema();
+	  schema = SchemaFactory.newV1();
     schema.addColumn("name", Type.BLOB);
     schema.addColumn("addr", Type.TEXT);
     info = CatalogUtil.newTableMeta("TEXT");
@@ -67,7 +67,7 @@ public class TestTableDesc {
 
   @Test
   public void test() throws CloneNotSupportedException, IOException {
-    Schema schema = new Schema();
+    Schema schema = SchemaFactory.newV1();
     schema.addColumn("name", Type.BLOB);
     schema.addColumn("addr", Type.TEXT);
     TableMeta info = CatalogUtil.newTableMeta("TEXT");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableMeta.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableMeta.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableMeta.java
index d85fc48..2e4c6a9 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableMeta.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableMeta.java
@@ -38,7 +38,7 @@ public class TestTableMeta {
   
   @Test
   public void testTableMetaTableProto() {    
-    Schema schema1 = new Schema();
+    Schema schema1 = SchemaFactory.newV1();
     schema1.addColumn("name", Type.BLOB);
     schema1.addColumn("addr", Type.TEXT);
     TableMeta meta1 = CatalogUtil.newTableMeta("TEXT");
@@ -49,7 +49,7 @@ public class TestTableMeta {
   
   @Test
   public final void testClone() throws CloneNotSupportedException {
-    Schema schema1 = new Schema();
+    Schema schema1 = SchemaFactory.newV1();
     schema1.addColumn("name", Type.BLOB);
     schema1.addColumn("addr", Type.TEXT);
     TableMeta meta1 = CatalogUtil.newTableMeta("TEXT");
@@ -61,7 +61,7 @@ public class TestTableMeta {
   
   @Test
   public void testSchema() throws CloneNotSupportedException {
-    Schema schema1 = new Schema();
+    Schema schema1 = SchemaFactory.newV1();
     schema1.addColumn("name", Type.BLOB);
     schema1.addColumn("addr", Type.TEXT);
     TableMeta meta1 = CatalogUtil.newTableMeta("TEXT");
@@ -78,7 +78,7 @@ public class TestTableMeta {
   
   @Test
   public void testEqualsObject() {   
-    Schema schema2 = new Schema();
+    Schema schema2 = SchemaFactory.newV1();
     schema2.addColumn("name", Type.BLOB);
     schema2.addColumn("addr", Type.TEXT);
     TableMeta meta2 = CatalogUtil.newTableMeta("TEXT");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 95cbf18..954817c 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -148,7 +148,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
       path = table.getPath();
 
       // convert HiveCatalogStore field schema into tajo field schema.
-      schema = new org.apache.tajo.catalog.Schema();
+      schema = SchemaFactory.newV1();
 
       List<FieldSchema> fieldSchemaList = table.getCols();
       boolean isPartitionKey;
@@ -238,7 +238,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
       List<FieldSchema> partitionKeys = table.getPartitionKeys();
 
       if (null != partitionKeys) {
-        org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
+        org.apache.tajo.catalog.Schema expressionSchema = SchemaFactory.newV1();
         StringBuilder sb = new StringBuilder();
         if (partitionKeys.size() > 0) {
           for (int i = 0; i < partitionKeys.size(); i++) {
@@ -814,7 +814,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
       List<FieldSchema> partitionKeys = table.getPartitionKeys();
 
       if (partitionKeys != null && partitionKeys.size() > 0) {
-        org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
+        org.apache.tajo.catalog.Schema expressionSchema = SchemaFactory.newV1();
         StringBuilder sb = new StringBuilder();
         if (partitionKeys.size() > 0) {
           for (int i = 0; i < partitionKeys.size(); i++) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index 46935fc..e8d60cf 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -95,7 +95,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingTextFile() throws Exception {
     TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("c_custkey", TajoDataTypes.Type.INT4);
     schema.addColumn("c_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("c_address", TajoDataTypes.Type.TEXT);
@@ -135,7 +135,7 @@ public class TestHiveCatalogStore {
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
     TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
@@ -169,7 +169,7 @@ public class TestHiveCatalogStore {
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
     TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
@@ -203,7 +203,7 @@ public class TestHiveCatalogStore {
     options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava("\u0003"));
     TableMeta meta = new TableMeta(BuiltinStorages.TEXT, options);
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("s_suppkey", TajoDataTypes.Type.INT4);
     schema.addColumn("s_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("s_address", TajoDataTypes.Type.TEXT);
@@ -252,7 +252,7 @@ public class TestHiveCatalogStore {
   public void testAddTableByPartition() throws Exception {
     TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("n_comment", TajoDataTypes.Type.TEXT);
@@ -261,7 +261,7 @@ public class TestHiveCatalogStore {
     TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, NATION), schema, meta,
         new Path(warehousePath, new Path(DB_NAME, NATION)).toUri());
 
-    org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema expressionSchema = SchemaFactory.newV1();
     expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4);
     expressionSchema.addColumn("n_date", TajoDataTypes.Type.TEXT);
 
@@ -500,7 +500,7 @@ public class TestHiveCatalogStore {
   @Test
   public void testGetAllTableNames() throws Exception{
     TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("n_comment", TajoDataTypes.Type.TEXT);
@@ -528,7 +528,7 @@ public class TestHiveCatalogStore {
   @Test
   public void testDeleteTable() throws Exception {
     TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("n_comment", TajoDataTypes.Type.TEXT);
@@ -553,7 +553,7 @@ public class TestHiveCatalogStore {
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
     TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
@@ -587,7 +587,7 @@ public class TestHiveCatalogStore {
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
     TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
@@ -619,7 +619,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingParquet() throws Exception {
     TableMeta meta = new TableMeta("PARQUET", new KeyValueSet());
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("c_custkey", TajoDataTypes.Type.INT4);
     schema.addColumn("c_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("c_address", TajoDataTypes.Type.TEXT);
@@ -656,7 +656,7 @@ public class TestHiveCatalogStore {
 
     TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
-    org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
+    org.apache.tajo.catalog.Schema schema = SchemaFactory.newV1();
     schema.addColumn("col1", TajoDataTypes.Type.INT4);
     schema.addColumn("col2", TajoDataTypes.Type.INT1);
     schema.addColumn("col3", TajoDataTypes.Type.INT2);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 99ffcb5..1c93d08 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -2717,7 +2717,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       // Since the column names in the unified name are always sorted
       // in order of occurrence position in the relation schema,
       // they can be uniquely identified.
-      String unifiedName = CatalogUtil.getUnifiedSimpleColumnName(new Schema(tableDescProto.getSchema()), columnNames);
+      String unifiedName = CatalogUtil.getUnifiedSimpleColumnName(SchemaFactory.newV1(tableDescProto.getSchema()), columnNames);
       pstmt.setInt(1, databaseId);
       pstmt.setInt(2, tableId);
       pstmt.setString(3, unifiedName);
@@ -2784,12 +2784,12 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     try (PreparedStatement pstmt = getConnection().prepareStatement(sql)) {
       int databaseId = getDatabaseId(databaseName);
       int tableId = getTableId(databaseId, databaseName, tableName);
-      Schema relationSchema = new Schema(getTable(databaseName, tableName).getSchema());
+      Schema relationSchema = SchemaFactory.newV1(getTable(databaseName, tableName).getSchema());
 
       // Since the column names in the unified name are always sorted
       // in order of occurrence position in the relation schema,
       // they can be uniquely identified.
-      String unifiedName = CatalogUtil.getUnifiedSimpleColumnName(new Schema(relationSchema), columnNames);
+      String unifiedName = CatalogUtil.getUnifiedSimpleColumnName(SchemaFactory.newV1(relationSchema), columnNames);
       pstmt.setInt(1, databaseId);
       pstmt.setInt(2, tableId);
       pstmt.setString(3, unifiedName);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8dad551e/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java
index 3a1a0cd..51461d1 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java
@@ -197,7 +197,7 @@ public class CatalogTestingUtil {
   }
 
   public static TableDesc buildTableDesc(String databaseName, String tableName, String testDir) throws IOException {
-    Schema schema = new Schema();
+    Schema schema = SchemaFactory.newV1();
     schema.addColumn(CatalogUtil.buildFQName(tableName, "Column"), Type.BLOB);
     schema.addColumn(CatalogUtil.buildFQName(tableName, "column"), Type.INT4);
     schema.addColumn(CatalogUtil.buildFQName(tableName, "cOlumn"), Type.INT8);
@@ -213,7 +213,7 @@ public class CatalogTestingUtil {
   }
 
   public static TableDesc buildPartitionTableDesc(String databaseName, String tableName, String testDir) throws Exception {
-    Schema partSchema = new Schema();
+    Schema partSchema = SchemaFactory.newV1();
     partSchema.addColumn(CatalogUtil.buildFQName(tableName, "DaTe"), Type.TEXT);
     partSchema.addColumn(CatalogUtil.buildFQName(tableName, "dAtE"), Type.TEXT);
     PartitionMethodDesc partitionMethodDesc =