You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/27 07:40:54 UTC

[3/5] TAJO-287: Refactor TableDesc, TableMeta, and Fragment. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index e7cb7a0..0f1cbde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -30,26 +30,23 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.ColumnStat;
+import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.TaskScheduler;
@@ -79,8 +76,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private MasterPlan masterPlan;
   private ExecutionBlock block;
   private int priority;
+  private Schema schema;
   private TableMeta meta;
-  private TableStat statistics;
+  private TableStats statistics;
   private EventHandler eventHandler;
   private final AbstractStorageManager sm;
   private TaskSchedulerImpl taskScheduler;
@@ -264,12 +262,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return tasks.get(qid);
   }
 
-  @SuppressWarnings("UnusedDeclaration")
+  public Schema getSchema() {
+    return schema;
+  }
+
   public TableMeta getTableMeta() {
     return meta;
   }
 
-  public TableStat getTableStat() {
+  public TableStats getTableStat() {
     return statistics;
   }
 
@@ -306,12 +307,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public static TableStat computeStatFromUnionBlock(SubQuery subQuery) {
-    TableStat stat = new TableStat();
-    TableStat childStat;
+  public static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
+    TableStats stat = new TableStats();
+    TableStats childStat;
     long avgRows = 0, numBytes = 0, numRows = 0;
     int numBlocks = 0, numPartitions = 0;
-    List<ColumnStat> columnStats = Lists.newArrayList();
+    List<ColumnStats> columnStatses = Lists.newArrayList();
 
     MasterPlan masterPlan = subQuery.getMasterPlan();
     Iterator<ExecutionBlock> it = masterPlan.getChilds(subQuery.getBlock()).iterator();
@@ -320,14 +321,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
       childStat = childSubQuery.getTableStat();
       avgRows += childStat.getAvgRows();
-      columnStats.addAll(childStat.getColumnStats());
+      columnStatses.addAll(childStat.getColumnStats());
       numBlocks += childStat.getNumBlocks();
       numBytes += childStat.getNumBytes();
       numPartitions += childStat.getNumPartitions();
       numRows += childStat.getNumRows();
     }
 
-    stat.setColumnStats(columnStats);
+    stat.setColumnStats(columnStatses);
     stat.setNumBlocks(numBlocks);
     stat.setNumBytes(numBytes);
     stat.setNumPartitions(numPartitions);
@@ -336,13 +337,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return stat;
   }
 
-  private TableStat computeStatFromTasks() {
-    List<TableStat> stats = Lists.newArrayList();
+  private TableStats computeStatFromTasks() {
+    List<TableStats> stats = Lists.newArrayList();
     for (QueryUnit unit : getQueryUnits()) {
       stats.add(unit.getStats());
     }
-    TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
-    return tableStat;
+    TableStats tableStats = StatisticsUtil.aggregateTableStat(stats);
+    return tableStats;
   }
 
   private void stopScheduler() {
@@ -360,11 +361,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   private void finish() {
-    TableStat stat;
+    TableStats stats;
     if (block.hasUnion()) {
-      stat = computeStatFromUnionBlock(this);
+      stats = computeStatFromUnionBlock(this);
     } else {
-      stat = computeStatFromTasks();
+      stats = computeStatFromTasks();
     }
 
     DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
@@ -376,12 +377,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     if (storeTableNode != null) {
       storeType = storeTableNode.getStorageType();
     }
-    meta = CatalogUtil.newTableMeta(channel.getSchema(), storeType, new Options());
-    meta.setStat(stat);
-    statistics = stat;
+    schema = channel.getSchema();
+    meta = CatalogUtil.newTableMeta(storeType, new Options());
+    statistics = stats;
     setFinishTime();
 
-    eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
+    eventHandler.handle(new SubQuerySucceeEvent(getId()));
   }
 
   @Override
@@ -592,7 +593,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       Map<String, TableDesc> tableMap = context.getTableDescMap();
       if (masterPlan.isLeaf(execBlock)) {
         ScanNode outerScan = execBlock.getScanNodes()[0];
-        TableStat stat = tableMap.get(outerScan.getCanonicalName()).getMeta().getStat();
+        TableStats stat = tableMap.get(outerScan.getCanonicalName()).getStats();
         return stat.getNumBytes();
       } else {
         long aggregatedVolume = 0;
@@ -641,7 +642,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       meta = desc.getMeta();
 
       // TODO - should be change the inner directory
-      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, inputPath);
+      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+          inputPath);
 
       QueryUnit queryUnit;
       List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 2ba563c..c95ff14 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -13,7 +13,6 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.util.JSPUtil;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.glassfish.grizzly.threadpool.FixedThreadPool;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -31,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index e77086d..cca7242 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,14 +32,13 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.SortNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.*;
@@ -55,10 +55,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -78,6 +75,7 @@ public class Task {
   private final TaskAttemptContext context;
   private List<Fetcher> fetcherRunners;
   private final LogicalNode plan;
+  private final Map<String, TableDesc> descs = Maps.newHashMap();
   private PhysicalExec executor;
   private boolean interQuery;
   private boolean killed = false;
@@ -150,6 +148,12 @@ public class Task {
     this.context.setEnforcer(request.getEnforcer());
 
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
+    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+    for (LogicalNode node : scanNode) {
+      ScanNode scan = (ScanNode)node;
+      descs.put(scan.getCanonicalName(), scan.getTableDesc());
+    }
+
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
       context.setInterQuery();
@@ -179,7 +183,8 @@ public class Task {
 
     LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
     for (Fragment f: request.getFragments()) {
-      LOG.info("Table Id:" + f.getName() + ", path:" + f.getPath() + "(" + f.getMeta().getStoreType() + "), " +
+      TableDesc desc = descs.get(f.getName());
+      LOG.info("Table Id:" + f.getName() + ", path:" + desc.getPath() + "(" + desc.getMeta().getStoreType() + "), " +
           "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")");
     }
     LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
@@ -328,7 +333,7 @@ public class Task {
     if (context.hasResultStats()) {
       builder.setResultStats(context.getResultStats().getProto());
     } else {
-      builder.setResultStats(new TableStat().getProto());
+      builder.setResultStats(new TableStats().getProto());
     }
 
     Iterator<Entry<Integer,String>> it = context.getRepartitions();
@@ -350,8 +355,7 @@ public class Task {
     Collection<String> inputs = Lists.newArrayList(context.getInputTables());
     for (String inputTable: inputs) {
       File tableDir = new File(context.getFetchIn(), inputTable);
-      Fragment [] frags = localizeFetchedData(tableDir, inputTable,
-          context.getTable(inputTable).getMeta());
+      Fragment [] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
       context.changeFragment(inputTable, frags);
     }
   }
@@ -461,7 +465,7 @@ public class Task {
       if (f.getLen() == 0) {
         continue;
       }
-      tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen());
+      tablet = new Fragment(name, f.getPath(), 0l, f.getLen());
       listTablets.add(tablet);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 906c4d6..f05e1a1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
@@ -46,7 +46,7 @@ public class TaskAttemptContext {
   private final Map<String, List<Fragment>> fragmentMap = new HashMap<String, List<Fragment>>();
 
   private TaskAttemptState state;
-  private TableStat resultStats;
+  private TableStats resultStats;
   private QueryUnitAttemptId queryId;
   private final Path workDir;
   private boolean needFetch = false;
@@ -115,11 +115,11 @@ public class TaskAttemptContext {
     return resultStats != null;
   }
 
-  public void setResultStats(TableStat stats) {
+  public void setResultStats(TableStats stats) {
     this.resultStats = stats;
   }
 
-  public TableStat getResultStats() {
+  public TableStats getResultStats() {
     return this.resultStats;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index 5a70f91..89c40c8 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -121,8 +121,9 @@ message GetTableDescRequest {
 
 message CreateTableRequest {
   required string name = 1;
-  required string path = 2;
+  required SchemaProto schema = 2;
   required TableProto meta = 3;
+  required string path = 4;
 }
 
 message AttachTableRequest {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
index 17b237b..b70efc6 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
@@ -43,7 +43,6 @@ service TajoMasterClientProtocolService {
   rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
   rpc createExternalTable(CreateTableRequest) returns (TableResponse);
   rpc dropTable(StringProto) returns (BoolProto);
-  rpc attachTable(AttachTableRequest) returns (TableResponse);
   rpc detachTable(StringProto) returns (BoolProto);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 109dfa9..987af25 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -34,14 +34,14 @@ message TaskStatusProto {
   required float progress = 3;
   required TaskAttemptState state = 4;
   optional StatSetProto stats = 5;
-  optional TableStatProto resultStats = 6;
+  optional TableStatsProto resultStats = 6;
   repeated Partition partitions = 7;
 }
 
 message TaskCompletionReport {
   required QueryUnitAttemptIdProto id = 1;
   optional StatSetProto stats = 2;
-  optional TableStatProto resultStats = 3;
+  optional TableStatsProto resultStats = 3;
   repeated Partition partitions = 4;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 61a8958..8f7189f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -33,7 +33,6 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.util.FileUtil;
 
 import java.io.IOException;
 
@@ -46,7 +45,7 @@ public class BackendTestingUtil {
     mockupSchema = new Schema();
     mockupSchema.addColumn("deptname", Type.TEXT);
     mockupSchema.addColumn("score", Type.INT4);
-    mockupMeta = CatalogUtil.newTableMeta(mockupSchema, StoreType.CSV);
+    mockupMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 	}
 
   public static void writeTmpTable(TajoConf conf, Path tablePath)
@@ -62,7 +61,7 @@ public class BackendTestingUtil {
     }
     fs.mkdirs(tablePath);
 
-    appender = sm.getAppender(mockupMeta, filePath);
+    appender = sm.getAppender(mockupMeta, mockupSchema, filePath);
     appender.init();
 
     int deptSize = 10000;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 178e8b3..fa67eb6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -81,9 +81,8 @@ public class LocalTajoTestingUtility {
       fs.mkdirs(tablePath);
       Path dfsPath = new Path(tablePath, localPath.getName());
       fs.copyFromLocalFile(localPath, dfsPath);
-      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
-          CatalogProtos.StoreType.CSV, option);
-      client.createExternalTable(names[i], tablePath, meta);
+      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+      client.createExternalTable(names[i], schemas[i], tablePath, meta);
     }
 
     LOG.info("===================================================");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index e348f2f..116e282 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -501,32 +501,6 @@ public class TajoTestingCluster {
     LOG.info("Minicluster is down");
   }
 
-  public static ResultSet runInLocal(String[] tableNames,
-                                     Schema[] schemas,
-                                     Options option,
-                                     String[][] tables,
-                                     String query) throws Exception {
-    TajoTestingCluster util = new TajoTestingCluster();
-    util.startMiniClusterInLocal(1);
-    TajoConf conf = util.getConfiguration();
-    TajoClient client = new TajoClient(conf);
-
-    File tmpDir = util.setupClusterTestBuildDir();
-    for (int i = 0; i < tableNames.length; i++) {
-      File tableDir = new File(tmpDir,tableNames[i]);
-      tableDir.mkdirs();
-      File tableFile = new File(tableDir, tableNames[i]);
-      writeLines(tableFile, tables[i]);
-      TableMeta meta = CatalogUtil
-          .newTableMeta(schemas[i], CatalogProtos.StoreType.CSV, option);
-      client.createExternalTable(tableNames[i], new Path(tableDir.getAbsolutePath()), meta);
-    }
-    Thread.sleep(1000);
-    ResultSet res = client.executeQueryAndGetResult(query);
-    util.shutdownMiniCluster();
-    return res;
-  }
-
   public static ResultSet run(String[] names,
                               Schema[] schemas,
                               Options option,
@@ -556,9 +530,8 @@ public class TajoTestingCluster {
         out.write((tables[i][j]+"\n").getBytes());
       }
       out.close();
-      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
-          CatalogProtos.StoreType.CSV, option);
-      client.createExternalTable(names[i], tablePath, meta);
+      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+      client.createExternalTable(names[i], schemas[i], tablePath, meta);
     }
     Thread.sleep(1000);
     ResultSet res = client.executeQueryAndGetResult(query);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
index 6123f59..f91e8ec 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
@@ -37,11 +37,11 @@ public class TestDDLBuilder {
     Schema schema = new Schema();
     schema.addColumn("name", TajoDataTypes.Type.BLOB);
     schema.addColumn("addr", TajoDataTypes.Type.TEXT);
-    TableMeta meta = CatalogUtil.newTableMeta(schema, CatalogProtos.StoreType.CSV);
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
     meta.putOption("csv.delimiter", "|");
     meta.putOption(TableMeta.COMPRESSION_CODEC, GzipCodec.class.getName());
 
-    TableDesc desc = new TableDescImpl("table1", meta, new Path("/table1"));
+    TableDesc desc = new TableDesc("table1", schema, meta, new Path("/table1"));
 
     assertEquals(FileUtil.readTextFile(new File("src/test/results/testBuildDDL.result")), DDLBuilder.buildDDL(desc));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 09d8ceb..bd62f40 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -89,7 +89,7 @@ public class TestTajoClient {
 
     assertFalse(client.existTable(tableName));
 
-    client.createExternalTable(tableName, tablePath, BackendTestingUtil.mockupMeta);
+    client.createExternalTable(tableName, BackendTestingUtil.mockupSchema, tablePath, BackendTestingUtil.mockupMeta);
     assertTrue(client.existTable(tableName));
     client.dropTable(tableName);
     assertFalse(client.existTable(tableName));
@@ -183,12 +183,12 @@ public class TestTajoClient {
     assertNotNull(tablePath);
     assertFalse(client.existTable(tableName1));
 
-    client.createExternalTable("table3", tablePath, BackendTestingUtil.mockupMeta);
+    client.createExternalTable("table3", BackendTestingUtil.mockupSchema, tablePath, BackendTestingUtil.mockupMeta);
     assertTrue(client.existTable(tableName1));
 
     TableDesc desc = client.getTableDesc(tableName1);
     assertNotNull(desc);
     assertEquals(tableName1, desc.getName());
-    assertTrue(desc.getMeta().getStat().getNumBytes() > 0);
+    assertTrue(desc.getStats().getNumBytes() > 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 95f024c..c35c786 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -114,7 +114,7 @@ public class ExprTestBase {
           vtuple.put(i, lazyTuple.get(i));
         }
       }
-      cat.addTable(new TableDescImpl(tableName, inputSchema, CatalogProtos.StoreType.CSV, new Options(), CommonTestingUtil.getTestDir()));
+      cat.addTable(new TableDesc(tableName, inputSchema, CatalogProtos.StoreType.CSV, new Options(), CommonTestingUtil.getTestDir()));
     }
 
     Target [] targets = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 1304de5..b99dcce 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -65,8 +65,8 @@ public class TestEvalTree {
     schema.addColumn("score", INT4);
     schema.addColumn("age", INT4);
 
-    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
-    TableDesc desc = new TableDescImpl("people", meta, CommonTestingUtil.getTestDir());
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableDesc desc = new TableDesc("people", schema, meta, CommonTestingUtil.getTestDir());
     cat.addTable(desc);
 
     FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
@@ -159,7 +159,7 @@ public class TestEvalTree {
 
     EvalNode expr;
 
-    Schema peopleSchema = cat.getTableDesc("people").getMeta().getSchema();
+    Schema peopleSchema = cat.getTableDesc("people").getSchema();
     EvalContext evalCtx;
     expr = getRootSelection(QUERIES[0]);
     evalCtx = expr.newContext();
@@ -609,7 +609,7 @@ public class TestEvalTree {
     assertTrue(not.terminate(evalCtx).asBool());
     
     // Evaluation Test
-    Schema peopleSchema = cat.getTableDesc("people").getMeta().getSchema();
+    Schema peopleSchema = cat.getTableDesc("people").getSchema();
     expr = getRootSelection(NOT[0]);
     evalCtx = expr.newContext();
     expr.eval(evalCtx, peopleSchema, tuples[0]);
@@ -631,7 +631,7 @@ public class TestEvalTree {
   public final void testLike() {
     EvalNode expr;
 
-    Schema peopleSchema = cat.getTableDesc("people").getMeta().getSchema();
+    Schema peopleSchema = cat.getTableDesc("people").getSchema();
     // prefix
     expr = getRootSelection(LIKE[0]);
     EvalContext evalCtx = expr.newContext();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index b4edf6a..1850081 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -72,8 +72,8 @@ public class TestEvalTreeUtil {
     schema.addColumn("score", TajoDataTypes.Type.INT4);
     schema.addColumn("age", TajoDataTypes.Type.INT4);
 
-    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
-    TableDesc desc = new TableDescImpl("people", meta, CommonTestingUtil.getTestDir());
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableDesc desc = new TableDesc("people", schema, meta, CommonTestingUtil.getTestDir());
     catalog.addTable(desc);
 
     FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
index 1a2b360..596ce1e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
@@ -47,12 +47,12 @@ public class TestLogicalNode {
     schema.addColumn("age", Type.INT2);
     GroupbyNode groupbyNode = new GroupbyNode(0, new Column[]{schema.getColumn(1), schema.getColumn(2)});
     ScanNode scanNode = new ScanNode(0,
-        CatalogUtil.newTableDesc("in", CatalogUtil.newTableMeta(schema, StoreType.CSV), new Path("in")));
+        CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
 
     GroupbyNode groupbyNode2 = new GroupbyNode(0, new Column[]{schema.getColumn(1), schema.getColumn(2)});
     JoinNode joinNode = new JoinNode(0);
     ScanNode scanNode2 = new ScanNode(0,
-        CatalogUtil.newTableDesc("in2", CatalogUtil.newTableMeta(schema, StoreType.CSV), new Path("in2")));
+        CatalogUtil.newTableDesc("in2", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in2")));
 
     groupbyNode.setChild(scanNode);
     groupbyNode2.setChild(joinNode);
@@ -63,7 +63,7 @@ public class TestLogicalNode {
     assertFalse(groupbyNode.deepEquals(groupbyNode2));
 
     ScanNode scanNode3 = new ScanNode(0,
-        CatalogUtil.newTableDesc("in", CatalogUtil.newTableMeta(schema, StoreType.CSV), new Path("in")));
+        CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
     groupbyNode2.setChild(scanNode3);
 
     assertTrue(groupbyNode.equals(groupbyNode2));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 8b25cf6..50bc8dc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -66,14 +66,14 @@ public class TestLogicalOptimizer {
     schema3.addColumn("score", Type.INT4);
     schema3.addColumn("phone", Type.INT4);
 
-    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
-    TableDesc people = new TableDescImpl("employee", meta, CommonTestingUtil.getTestDir());
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableDesc people = new TableDesc("employee", schema, meta, CommonTestingUtil.getTestDir());
     catalog.addTable(people);
 
-    TableDesc student = new TableDescImpl("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc student = new TableDesc("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(student);
 
-    TableDesc score = new TableDescImpl("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc score = new TableDesc("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.GENERAL,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index d830591..025c84b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -22,7 +22,7 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.master.TajoMaster;
@@ -65,11 +65,12 @@ public class TestLogicalPlan {
     tpch.loadOutSchema();
 
     for (int i = 0; i < tpchTables.length; i++) {
-      TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(tpchTables[i]), CatalogProtos.StoreType.CSV);
-      TableStat stat = new TableStat();
-      stat.setNumBytes(tableVolumns[i]);
-      m.setStat(stat);
-      TableDesc d = CatalogUtil.newTableDesc(tpchTables[i], m, CommonTestingUtil.getTestDir());
+      TableMeta m = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+      TableStats stats = new TableStats();
+      stats.setNumBytes(tableVolumns[i]);
+      TableDesc d = CatalogUtil.newTableDesc(tpchTables[i], tpch.getSchema(tpchTables[i]), m,
+          CommonTestingUtil.getTestDir());
+      d.setStats(stats);
       catalog.addTable(d);
     }
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 9e4727b..b3aaf32 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -75,14 +75,14 @@ public class TestLogicalPlanner {
     schema3.addColumn("deptname", Type.TEXT);
     schema3.addColumn("score", Type.INT4);
 
-    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
-    TableDesc people = new TableDescImpl("employee", meta, CommonTestingUtil.getTestDir());
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableDesc people = new TableDesc("employee", schema, meta, CommonTestingUtil.getTestDir());
     catalog.addTable(people);
 
-    TableDesc student = new TableDescImpl("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc student = new TableDesc("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(student);
 
-    TableDesc score = new TableDescImpl("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc score = new TableDesc("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.AGGREGATION,
@@ -98,8 +98,8 @@ public class TestLogicalPlanner {
     tpch.loadSchemas();
     tpch.loadOutSchema();
     for (String table : tpchTables) {
-      TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(table), StoreType.CSV);
-      TableDesc d = CatalogUtil.newTableDesc(table, m, CommonTestingUtil.getTestDir());
+      TableMeta m = CatalogUtil.newTableMeta(StoreType.CSV);
+      TableDesc d = CatalogUtil.newTableDesc(table, tpch.getSchema(table), m, CommonTestingUtil.getTestDir());
       catalog.addTable(d);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index b2727b2..b456cf9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -66,14 +66,14 @@ public class TestPlannerUtil {
     schema3.addColumn("deptName", Type.TEXT);
     schema3.addColumn("score", CatalogUtil.newSimpleDataType(Type.INT4));
 
-    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
-    TableDesc people = new TableDescImpl("employee", meta, CommonTestingUtil.getTestDir());
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableDesc people = new TableDesc("employee", schema, meta, CommonTestingUtil.getTestDir());
     catalog.addTable(people);
 
-    TableDesc student = new TableDescImpl("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc student = new TableDesc("dept", schema2, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(student);
 
-    TableDesc score = new TableDescImpl("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
+    TableDesc score = new TableDesc("score", schema3, StoreType.CSV, new Options(), CommonTestingUtil.getTestDir());
     catalog.addTable(score);
 
     FunctionDesc funcDesc = new FunctionDesc("sumtest", SumInt.class, FunctionType.AGGREGATION,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 555b698..4d2bd6c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -78,11 +78,11 @@ public class TestBNLJoinExec {
     schema.addColumn("memId", Type.INT4);
     schema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(schema.getColumnNum());
     for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
@@ -91,7 +91,7 @@ public class TestBNLJoinExec {
     }
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    employee = CatalogUtil.newTableDesc("employee", schema, employeeMeta, employeePath);
     catalog.addTable(employee);
 
     Schema peopleSchema = new Schema();
@@ -99,11 +99,11 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(10 + i),
@@ -114,7 +114,7 @@ public class TestBNLJoinExec {
     appender.flush();
     appender.close();
 
-    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 8945832..615befa 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -103,14 +103,15 @@ public class TestBSTIndexExec {
     writer.open();
     long offset;
 
-    meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    meta = CatalogUtil.newTableMeta(StoreType.CSV);
     tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
     fs = tablePath.getFileSystem(conf);
     fs.mkdirs(tablePath.getParent());
 
-    FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
+    FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+        tablePath);
     appender.init();
-    Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(schema.getColumnNum());
     for (int i = 0; i < 10000; i++) {
       
       Tuple key = new VTuple(this.idxSchema.getColumnNum());
@@ -134,8 +135,7 @@ public class TestBSTIndexExec {
     appender.close();
     writer.close();
 
-    TableDesc desc = new TableDescImpl("employee", meta,
-        sm.getTablePath("employee"));
+    TableDesc desc = new TableDesc("employee", schema, meta, sm.getTablePath("employee"));
     catalog.addTable(desc);
 
     analyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index febb315..282f5be 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -75,12 +75,12 @@ public class TestExternalSortExec {
     schema.addColumn("empId", Type.INT4);
     schema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(schema.getColumnNum());
     for (int i = 0; i < numTuple; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(rnd.nextInt(50)),
           DatumFactory.createInt4(rnd.nextInt(100)),
@@ -92,7 +92,7 @@ public class TestExternalSortExec {
 
     System.out.println("Total Rows: " + appender.getStats().getNumRows());
 
-    employee = new TableDescImpl("employee", employeeMeta, employeePath);
+    employee = new TableDesc("employee", schema, employeeMeta, employeePath);
     catalog.addTable(employee);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index f293f70..c7fec48 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -91,11 +91,11 @@ public class TestFullOuterHashJoinExec {
     dep3Schema.addColumn("loc_id", Type.INT4);
 
 
-    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
-    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(dep3Schema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createText("dept_" + i),
@@ -105,7 +105,7 @@ public class TestFullOuterHashJoinExec {
 
     appender1.flush();
     appender1.close();
-    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path);
     catalog.addTable(dep3);
 
     //----------------- job3 ------------------------------
@@ -120,12 +120,11 @@ public class TestFullOuterHashJoinExec {
     job3Schema.addColumn("job_title", Type.TEXT);
 
 
-    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
-        StoreType.CSV);
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
-    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    Tuple tuple2 = new VTuple(job3Schema.getColumnNum());
     for (int i = 1; i < 4; i++) {
       int x = 100 + i;
       tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
@@ -135,7 +134,7 @@ public class TestFullOuterHashJoinExec {
 
     appender2.flush();
     appender2.close();
-    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path);
     catalog.addTable(job3);
 
 
@@ -160,11 +159,11 @@ public class TestFullOuterHashJoinExec {
     emp3Schema.addColumn("job_id", Type.INT4);
 
 
-    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
-    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+    Tuple tuple3 = new VTuple(emp3Schema.getColumnNum());
 
     for (int i = 1; i < 4; i += 2) {
       int x = 10 + i;
@@ -199,7 +198,7 @@ public class TestFullOuterHashJoinExec {
 
     appender3.flush();
     appender3.close();
-    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path);
     catalog.addTable(emp3);
 
     //---------------------phone3 --------------------
@@ -212,15 +211,15 @@ public class TestFullOuterHashJoinExec {
     phone3Schema.addColumn("phone_number", Type.TEXT);
 
 
-    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
-        StoreType.CSV);
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
     appender5.init();
 
     appender5.flush();
     appender5.close();
-    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Schema, phone3Meta, phone3Path);
     catalog.addTable(phone3);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 8d0466f..61802fd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -94,11 +94,11 @@ public class TestFullOuterMergeJoinExec {
     dep3Schema.addColumn("loc_id", Type.INT4);
 
 
-    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
-    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(dep3Schema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createText("dept_" + i),
@@ -108,7 +108,7 @@ public class TestFullOuterMergeJoinExec {
 
     appender1.flush();
     appender1.close();
-    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path);
     catalog.addTable(dep3);
 
 
@@ -132,12 +132,11 @@ public class TestFullOuterMergeJoinExec {
     dep4Schema.addColumn("loc_id", Type.INT4);
 
 
-    TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema,
-        StoreType.CSV);
+    TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path);
+    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
-    Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum());
+    Tuple tuple4 = new VTuple(dep4Schema.getColumnNum());
     for (int i = 0; i < 11; i++) {
       tuple4.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createText("dept_" + i),
@@ -147,7 +146,7 @@ public class TestFullOuterMergeJoinExec {
 
     appender4.flush();
     appender4.close();
-    dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path);
+    dep4 = CatalogUtil.newTableDesc("dep4", dep4Schema, dep4Meta, dep4Path);
     catalog.addTable(dep4);
 
 
@@ -164,12 +163,11 @@ public class TestFullOuterMergeJoinExec {
     job3Schema.addColumn("job_title", Type.TEXT);
 
 
-    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
-        StoreType.CSV);
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
-    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    Tuple tuple2 = new VTuple(job3Schema.getColumnNum());
     for (int i = 1; i < 4; i++) {
       int x = 100 + i;
       tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
@@ -179,7 +177,7 @@ public class TestFullOuterMergeJoinExec {
 
     appender2.flush();
     appender2.close();
-    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path);
     catalog.addTable(job3);
 
 
@@ -204,11 +202,11 @@ public class TestFullOuterMergeJoinExec {
     emp3Schema.addColumn("job_id", Type.INT4);
 
 
-    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
-    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+    Tuple tuple3 = new VTuple(emp3Schema.getColumnNum());
 
     for (int i = 1; i < 4; i += 2) {
       int x = 10 + i;
@@ -243,7 +241,7 @@ public class TestFullOuterMergeJoinExec {
 
     appender3.flush();
     appender3.close();
-    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path);
     catalog.addTable(emp3);
 
     //---------------------phone3 --------------------
@@ -256,14 +254,14 @@ public class TestFullOuterMergeJoinExec {
     phone3Schema.addColumn("phone_number", Type.TEXT);
 
 
-    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
-        StoreType.CSV);
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
     appender5.init();
     appender5.flush();
     appender5.close();
-    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Schema, phone3Meta, phone3Path);
     catalog.addTable(phone3);
 
     analyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index a91bbdd..e521a94 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -74,12 +74,12 @@ public class TestHashAntiJoinExec {
     employeeSchema.addColumn("memId", Type.INT4);
     employeeSchema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
-        StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(employeeSchema.getColumnNum());
 
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] {
@@ -92,7 +92,7 @@ public class TestHashAntiJoinExec {
 
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    employee = CatalogUtil.newTableDesc("employee", employeeSchema, employeeMeta, employeePath);
     catalog.addTable(employee);
 
     Schema peopleSchema = new Schema();
@@ -100,11 +100,11 @@ public class TestHashAntiJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     for (int i = 1; i < 10; i += 2) {
       tuple.put(new Datum[] {
           DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
@@ -117,7 +117,7 @@ public class TestHashAntiJoinExec {
     appender.flush();
     appender.close();
 
-    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 53ae60f..ba02c21 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -76,12 +76,12 @@ public class TestHashJoinExec {
     employeeSchema.addColumn("memId", Type.INT4);
     employeeSchema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
-        StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(employeeSchema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
@@ -91,7 +91,7 @@ public class TestHashJoinExec {
 
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    employee = CatalogUtil.newTableDesc("employee", employeeSchema, employeeMeta, employeePath);
     catalog.addTable(employee);
 
     Schema peopleSchema = new Schema();
@@ -99,11 +99,11 @@ public class TestHashJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     for (int i = 1; i < 10; i += 2) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(10 + i),
@@ -115,7 +115,7 @@ public class TestHashJoinExec {
     appender.flush();
     appender.close();
 
-    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index c5553f0..e4dcb95 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -74,12 +74,12 @@ public class TestHashSemiJoinExec {
     employeeSchema.addColumn("memId", Type.INT4);
     employeeSchema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
-        StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(employeeSchema.getColumnNum());
 
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] {
@@ -92,7 +92,7 @@ public class TestHashSemiJoinExec {
 
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    employee = CatalogUtil.newTableDesc("employee", employeeSchema, employeeMeta, employeePath);
     catalog.addTable(employee);
 
     Schema peopleSchema = new Schema();
@@ -100,11 +100,11 @@ public class TestHashSemiJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     // make 27 tuples
     for (int i = 1; i < 10; i += 2) {
       // make three duplicated tuples for each tuples
@@ -121,7 +121,7 @@ public class TestHashSemiJoinExec {
     appender.flush();
     appender.close();
 
-    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 0317a15..58f9b3b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -91,12 +91,11 @@ public class TestLeftOuterHashJoinExec {
     dep3Schema.addColumn("loc_id", Type.INT4);
 
 
-    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
-        StoreType.CSV);
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
-    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(dep3Schema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
                     DatumFactory.createText("dept_" + i),
@@ -106,7 +105,7 @@ public class TestLeftOuterHashJoinExec {
 
     appender1.flush();
     appender1.close();
-    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path);
     catalog.addTable(dep3);
 
     //----------------- job3 ------------------------------
@@ -121,12 +120,11 @@ public class TestLeftOuterHashJoinExec {
     job3Schema.addColumn("job_title", Type.TEXT);
 
 
-    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
-        StoreType.CSV);
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
-    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    Tuple tuple2 = new VTuple(job3Schema.getColumnNum());
     for (int i = 1; i < 4; i++) {
       int x = 100 + i;
       tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
@@ -136,7 +134,7 @@ public class TestLeftOuterHashJoinExec {
 
     appender2.flush();
     appender2.close();
-    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path);
     catalog.addTable(job3);
 
 
@@ -161,11 +159,11 @@ public class TestLeftOuterHashJoinExec {
     emp3Schema.addColumn("job_id", Type.INT4);
 
 
-    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
-    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+    Tuple tuple3 = new VTuple(emp3Schema.getColumnNum());
 
     for (int i = 1; i < 4; i += 2) {
       int x = 10 + i;
@@ -200,7 +198,7 @@ public class TestLeftOuterHashJoinExec {
 
     appender3.flush();
     appender3.close();
-    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path);
     catalog.addTable(emp3);
 
     //---------------------phone3 --------------------
@@ -213,15 +211,15 @@ public class TestLeftOuterHashJoinExec {
     phone3Schema.addColumn("phone_number", Type.TEXT);
 
 
-    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
-        StoreType.CSV);
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
     appender5.init();
     
     appender5.flush();
     appender5.close();
-    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Schema, phone3Meta, phone3Path);
     catalog.addTable(phone3);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 32cf936..025af87 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,16 +37,15 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class TestLeftOuterNLJoinExec {
   private TajoConf conf;
@@ -90,12 +89,11 @@ public class TestLeftOuterNLJoinExec {
     dep3Schema.addColumn("loc_id", Type.INT4);
 
 
-    TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
-        StoreType.CSV);
+    TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
-    Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(dep3Schema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
                     DatumFactory.createText("dept_" + i),
@@ -105,7 +103,7 @@ public class TestLeftOuterNLJoinExec {
 
     appender1.flush();
     appender1.close();
-    dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+    dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path);
     catalog.addTable(dep3);
 
     //----------------- job3 ------------------------------
@@ -120,12 +118,11 @@ public class TestLeftOuterNLJoinExec {
     job3Schema.addColumn("job_title", Type.TEXT);
 
 
-    TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
-        StoreType.CSV);
+    TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
-    Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+    Tuple tuple2 = new VTuple(job3Schema.getColumnNum());
     for (int i = 1; i < 4; i++) {
       int x = 100 + i;
       tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
@@ -135,7 +132,7 @@ public class TestLeftOuterNLJoinExec {
 
     appender2.flush();
     appender2.close();
-    job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+    job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path);
     catalog.addTable(job3);
 
 
@@ -160,11 +157,11 @@ public class TestLeftOuterNLJoinExec {
     emp3Schema.addColumn("job_id", Type.INT4);
 
 
-    TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+    TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
-    Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+    Tuple tuple3 = new VTuple(emp3Schema.getColumnNum());
 
     for (int i = 1; i < 4; i += 2) {
       int x = 10 + i;
@@ -199,7 +196,7 @@ public class TestLeftOuterNLJoinExec {
 
     appender3.flush();
     appender3.close();
-    emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+    emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path);
     catalog.addTable(emp3);
 
     // ---------------------phone3 --------------------
@@ -212,15 +209,15 @@ public class TestLeftOuterNLJoinExec {
     phone3Schema.addColumn("phone_number", Type.TEXT);
 
 
-    TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
-        StoreType.CSV);
+    TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+        phone3Path);
     appender5.init();
     
     appender5.flush();
     appender5.close();
-    phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+    phone3 = CatalogUtil.newTableDesc("phone3", phone3Schema, phone3Meta, phone3Path);
     catalog.addTable(phone3);
 
     analyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 1149c3f..5e72428 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -77,12 +77,12 @@ public class TestMergeJoinExec {
     employeeSchema.addColumn("memId", Type.INT4);
     employeeSchema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
-        StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+        employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(employeeSchema.getColumnNum());
     for (int i = 0; i < 10; i++) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
@@ -98,8 +98,7 @@ public class TestMergeJoinExec {
 
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta,
-        employeePath);
+    employee = CatalogUtil.newTableDesc("employee", employeeSchema, employeeMeta, employeePath);
     catalog.addTable(employee);
 
     Schema peopleSchema = new Schema();
@@ -107,11 +106,11 @@ public class TestMergeJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     for (int i = 1; i < 10; i += 2) {
       tuple.put(new Datum[] { DatumFactory.createInt4(i),
           DatumFactory.createInt4(10 + i),
@@ -130,7 +129,7 @@ public class TestMergeJoinExec {
     appender.flush();
     appender.close();
 
-    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index d4498c5..3d356ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -78,11 +78,11 @@ public class TestNLJoinExec {
     schema.addColumn("memId", Type.INT4);
     schema.addColumn("deptName", Type.TEXT);
 
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.init();
-    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+    Tuple tuple = new VTuple(schema.getColumnNum());
     for (int i = 0; i < 50; i++) {
       tuple.put(new Datum[] {
           DatumFactory.createInt4(i),
@@ -93,8 +93,7 @@ public class TestNLJoinExec {
     }
     appender.flush();
     appender.close();
-    employee = CatalogUtil.newTableDesc("employee", employeeMeta,
-        employeePath);
+    employee = CatalogUtil.newTableDesc("employee", schema, employeeMeta, employeePath);
     catalog.addTable(employee);
     
     Schema peopleSchema = new Schema();
@@ -102,11 +101,11 @@ public class TestNLJoinExec {
     peopleSchema.addColumn("fk_memId", Type.INT4);
     peopleSchema.addColumn("name", Type.TEXT);
     peopleSchema.addColumn("age", Type.INT4);
-    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
-    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    tuple = new VTuple(peopleSchema.getColumnNum());
     for (int i = 1; i < 50; i += 2) {
       tuple.put(new Datum[] {
           DatumFactory.createInt4(i),
@@ -118,8 +117,7 @@ public class TestNLJoinExec {
     appender.flush();
     appender.close();
     
-    people = CatalogUtil.newTableDesc("people", peopleMeta,
-        peoplePath);
+    people = CatalogUtil.newTableDesc("people", peopleSchema, peopleMeta, peoplePath);
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);