You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:09:07 UTC

[15/15] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Conflicts:
	tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
	tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
	tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
	tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
	tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
	tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
	tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
	tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
	tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
	tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
	tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
	tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
	tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/17dfe86c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/17dfe86c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/17dfe86c

Branch: refs/heads/index_support
Commit: 17dfe86ccacaf83a81d1f9b45d7c6b82409f0f82
Parents: 00a8c65 2ec307d
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jun 25 17:08:23 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jun 25 17:08:23 2015 +0900

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 CHANGES                                         |   7 +-
 NOTICE                                          |   2 +-
 .../org/apache/tajo/algebra/CreateTable.java    |  14 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   7 +
 .../org/apache/tajo/catalog/DDLBuilder.java     |   5 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |   8 +-
 .../org/apache/tajo/catalog/TestTableDesc.java  |   2 +-
 .../tajo/catalog/store/HiveCatalogStore.java    |   2 +-
 .../catalog/store/TestHiveCatalogStore.java     |  25 +-
 .../tajo/catalog/store/AbstractDBStore.java     |  44 +--
 .../src/main/resources/schemas/derby/derby.xml  |   2 +-
 .../main/resources/schemas/mariadb/mariadb.xml  |   3 +-
 .../src/main/resources/schemas/mysql/mysql.xml  |   3 +-
 .../main/resources/schemas/oracle/oracle.xml    |   3 +-
 .../resources/schemas/postgresql/postgresql.xml |   4 +-
 .../cli/tsql/commands/DescTableCommand.java     |   2 +-
 .../main/java/org/apache/tajo/QueryVars.java    |   4 +-
 .../apache/tajo/storage/StorageConstants.java   |   9 +
 .../org/apache/tajo/storage/StorageService.java |  37 ++
 .../java/org/apache/tajo/util/FileUtil.java     |  14 +-
 .../java/org/apache/tajo/util/KeyValueSet.java  |   8 +-
 .../java/org/apache/tajo/util/NumberUtil.java   |  54 ---
 .../org/apache/tajo/util/ReflectionUtil.java    |   6 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   8 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  11 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  11 +-
 .../engine/planner/global/GlobalPlanner.java    |   4 +-
 .../planner/physical/BSTIndexScanExec.java      |   2 +-
 .../planner/physical/ColPartitionStoreExec.java |   2 +-
 .../planner/physical/FilterScanIterator.java    |  56 +++
 .../planner/physical/FullScanIterator.java      |  47 +++
 .../engine/planner/physical/InsertRowsExec.java | 107 +++++
 .../planner/physical/PhysicalPlanUtil.java      |   2 +-
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../engine/planner/physical/ScanIterator.java   |  33 ++
 .../engine/planner/physical/SeqScanExec.java    |  40 +-
 .../engine/planner/physical/StoreTableExec.java |  23 +-
 .../apache/tajo/engine/query/QueryContext.java  |  71 +++-
 .../org/apache/tajo/master/GlobalEngine.java    |  17 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  22 +-
 .../tajo/master/TajoMasterClientService.java    |   3 +-
 .../apache/tajo/master/exec/DDLExecutor.java    |  58 ++-
 .../exec/NonForwardQueryResultFileScanner.java  |   2 +-
 .../apache/tajo/master/exec/QueryExecutor.java  | 253 ++++++------
 .../master/exec/prehook/CreateTableHook.java    |   8 +-
 .../prehook/DistributedQueryHookManager.java    |   2 +-
 .../master/exec/prehook/InsertIntoHook.java     |   8 +-
 .../java/org/apache/tajo/querymaster/Query.java |  53 ++-
 .../tajo/querymaster/QueryMasterTask.java       | 150 ++-----
 .../apache/tajo/querymaster/Repartitioner.java  |  95 +++--
 .../java/org/apache/tajo/querymaster/Stage.java |  12 +-
 .../java/org/apache/tajo/querymaster/Task.java  |  56 +--
 .../tajo/webapp/QueryExecutorServlet.java       |   2 +-
 .../tajo/worker/ExecutionBlockContext.java      |  31 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  |   2 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   5 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |   2 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  16 +-
 .../resources/webapps/admin/catalogview.jsp     |   3 +-
 .../src/main/resources/webapps/admin/index.jsp  |  23 +-
 .../org/apache/tajo/BackendTestingUtil.java     |   2 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |  39 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  35 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |   4 +-
 .../org/apache/tajo/client/TestTajoClient.java  |  18 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   9 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |   4 +-
 .../engine/planner/TestLogicalOptimizer.java    |   5 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   3 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  92 +++--
 .../tajo/engine/planner/TestPlannerUtil.java    |   5 +-
 .../planner/physical/TestBNLJoinExec.java       |  15 +-
 .../planner/physical/TestExternalSortExec.java  |   6 +-
 .../physical/TestFullOuterHashJoinExec.java     |  29 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  40 +-
 .../planner/physical/TestHashAntiJoinExec.java  |  15 +-
 .../planner/physical/TestHashJoinExec.java      |  14 +-
 .../planner/physical/TestHashSemiJoinExec.java  |  14 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  31 +-
 .../planner/physical/TestMergeJoinExec.java     |  10 +-
 .../engine/planner/physical/TestNLJoinExec.java |  14 +-
 .../planner/physical/TestPhysicalPlanner.java   |  57 ++-
 .../physical/TestProgressExternalSortExec.java  |   6 +-
 .../physical/TestRightOuterHashJoinExec.java    |  20 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  36 +-
 .../engine/planner/physical/TestSortExec.java   |  18 +-
 .../apache/tajo/engine/query/TestCTASQuery.java |   8 +-
 .../tajo/engine/query/TestCreateTable.java      |   8 +-
 .../tajo/engine/query/TestHBaseTable.java       | 274 +++++++------
 .../tajo/engine/query/TestInsertQuery.java      |   6 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |   6 +-
 .../tajo/engine/query/TestTablePartitions.java  |  28 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  88 +++--
 .../org/apache/tajo/jdbc/TestResultSet.java     |   4 +-
 .../tajo/master/TestExecutionBlockCursor.java   |  12 +-
 .../TestNonForwardQueryResultSystemScanner.java | 258 +-----------
 .../apache/tajo/master/TestRepartitioner.java   |  77 ++--
 .../tajo/querymaster/TestIntermediateEntry.java |  24 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |  10 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   5 +-
 .../results/TestHBaseTable/testCATS.result      | 100 -----
 .../results/TestHBaseTable/testCTAS.result      | 100 +++++
 .../testInsertIntoUsingPut.result               |   4 +-
 .../TestHBaseTable/testInsertValues1.result     |   4 +
 .../testGetClusterDetails.result                |   4 +
 .../testGetNextRowsForAggregateFunction.result  |   3 +
 .../testGetNextRowsForTable.result              |   5 +
 .../results/TestTajoCli/testDescTable.result    |   8 +-
 .../testDescTableForNestedSchema.result         |   4 +-
 .../src/main/conf/storage-site.json.template    |  35 ++
 tajo-docs/src/main/sphinx/index.rst             |   1 +
 tajo-docs/src/main/sphinx/storage_plugin.rst    |  47 +++
 tajo-docs/src/main/sphinx/table_management.rst  |   1 +
 .../sphinx/table_management/table_overview.rst  |   7 +
 .../sphinx/table_management/tablespaces.rst     |  45 +++
 .../java/org/apache/tajo/plan/LogicalPlan.java  |   8 +
 .../org/apache/tajo/plan/LogicalPlanner.java    |  62 ++-
 .../tajo/plan/logical/CreateTableNode.java      |  44 +--
 .../apache/tajo/plan/logical/InsertNode.java    |  51 +--
 .../plan/logical/PartitionedTableScanNode.java  |   2 +-
 .../org/apache/tajo/plan/logical/ScanNode.java  |   2 +-
 .../tajo/plan/logical/StoreTableNode.java       |  27 ++
 .../rewrite/rules/PartitionedTableRewriter.java |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |  16 +-
 .../tajo/plan/serder/LogicalNodeSerializer.java |  25 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  30 +-
 tajo-plan/src/main/proto/Plan.proto             |  11 +-
 tajo-storage/tajo-storage-common/pom.xml        |  12 +-
 .../org/apache/tajo/storage/FormatProperty.java |  63 +++
 .../org/apache/tajo/storage/MergeScanner.java   |   5 +-
 .../apache/tajo/storage/OldStorageManager.java  | 234 +++++++++++
 .../apache/tajo/storage/StorageProperty.java    |  59 ++-
 .../org/apache/tajo/storage/StorageUtil.java    |   4 +-
 .../apache/tajo/storage/TableSpaceManager.java  | 238 -----------
 .../org/apache/tajo/storage/Tablespace.java     | 211 ++++++----
 .../apache/tajo/storage/TablespaceManager.java  | 390 +++++++++++++++++++
 .../src/main/resources/storage-default.json     |  20 +
 tajo-storage/tajo-storage-hbase/pom.xml         |  11 +
 .../storage/hbase/AbstractHBaseAppender.java    |   2 +-
 .../storage/hbase/AddSortForInsertRewriter.java |  92 -----
 .../tajo/storage/hbase/ColumnMapping.java       |  17 +-
 .../tajo/storage/hbase/HBaseFragment.java       |  28 +-
 .../tajo/storage/hbase/HBasePutAppender.java    |  13 +-
 .../apache/tajo/storage/hbase/HBaseScanner.java |   9 +-
 .../tajo/storage/hbase/HBaseTablespace.java     | 248 +++++++-----
 .../storage/hbase/SortedInsertRewriter.java     | 117 ++++++
 .../src/main/proto/StorageFragmentProtos.proto  |  15 +-
 .../tajo/storage/hbase/TestColumnMapping.java   |   2 +-
 .../storage/hbase/TestHBaseStorageManager.java  | 108 -----
 .../tajo/storage/hbase/TestHBaseTableSpace.java | 134 +++++++
 tajo-storage/tajo-storage-hdfs/pom.xml          |   6 +-
 .../org/apache/tajo/storage/FileAppender.java   |  28 +-
 .../org/apache/tajo/storage/FileTablespace.java | 237 +++++++----
 .../tajo/storage/HashShuffleAppender.java       |  87 ++---
 .../storage/HashShuffleAppenderManager.java     |  19 +-
 .../tajo/storage/TestCompressionStorages.java   |   4 +-
 .../tajo/storage/TestDelimitedTextFile.java     |   8 +-
 .../tajo/storage/TestFileStorageManager.java    | 233 -----------
 .../apache/tajo/storage/TestFileSystems.java    |   2 +-
 .../apache/tajo/storage/TestFileTablespace.java | 250 ++++++++++++
 .../org/apache/tajo/storage/TestLineReader.java |   8 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   6 +-
 .../org/apache/tajo/storage/TestStorages.java   |  48 +--
 .../apache/tajo/storage/index/TestBSTIndex.java |  58 +--
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../apache/tajo/storage/json/TestJsonSerDe.java |   2 +-
 167 files changed, 3860 insertions(+), 2682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/CHANGES
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 3d09eb1,c6b9b41..d76ec92
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@@ -48,10 -49,7 +48,9 @@@ import org.apache.tajo.plan.LogicalPlan
  import org.apache.tajo.plan.logical.*;
  import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
  import org.apache.tajo.plan.util.PlannerUtil;
 -import org.apache.tajo.storage.*;
 +import org.apache.tajo.storage.FileTablespace;
 +import org.apache.tajo.storage.StorageConstants;
- import org.apache.tajo.storage.TableSpaceManager;
- import org.apache.tajo.storage.Tablespace;
++import org.apache.tajo.storage.TablespaceManager;
  import org.apache.tajo.storage.fragment.FileFragment;
  import org.apache.tajo.storage.fragment.Fragment;
  import org.apache.tajo.storage.fragment.FragmentConvertor;
@@@ -1192,11 -1185,21 +1192,10 @@@ public class PhysicalPlannerImpl implem
      Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
          "Error: There is no table matched to %s", annotation.getCanonicalName());
  
 -    FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
 -    List<FileFragment> fragments =
 -        FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
 -
 -    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
 -    FileTablespace sm = (FileTablespace) TablespaceManager.get(fragments.get(0).getPath().toUri()).get();
 -    String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
 -    String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
 -    Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");
 -
 -    TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
 -        annotation.getSortKeys());
 -    return new BSTIndexScanExec(ctx, annotation, fragments.get(0), new Path(indexPath, indexName),
 -        annotation.getKeySchema(), comp, annotation.getDatum());
 -
 +    FragmentProto [] fragments = ctx.getTables(annotation.getTableName());
- 
 +    Preconditions.checkState(fragments.length == 1);
 +    return new BSTIndexScanExec(ctx, annotation, fragments[0], annotation.getIndexPath(),
 +        annotation.getKeySchema(), annotation.getPredicates());
    }
  
    public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 34e6f5c,54abca8..521b6b9
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@@ -96,98 -67,12 +96,98 @@@ public class BSTIndexScanExec extends P
      this.reader.open();
    }
  
 +  private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
 +    Schema mergedSchema = new Schema();
 +    Set<Column> qualAndTargets = TUtil.newHashSet();
 +    qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
 +    for (Target target : targets) {
 +      qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
 +    }
 +    for (Column column : originalSchema.getRootColumns()) {
 +      if (subSchema.contains(column)
 +          || qualAndTargets.contains(column)
 +          || qualAndTargets.contains(column)) {
 +        mergedSchema.addColumn(column);
 +      }
 +    }
 +    return mergedSchema;
 +  }
 +
    @Override
    public void init() throws IOException {
 +    Schema projected;
 +
 +    // in the case where projected column or expression are given
 +    // the target can be an empty list.
 +    if (plan.hasTargets()) {
 +      projected = new Schema();
 +      Set<Column> columnSet = new HashSet<Column>();
 +
 +      if (plan.hasQual()) {
 +        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
 +      }
 +
 +      for (Target t : plan.getTargets()) {
 +        columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
 +      }
 +
 +      for (Column column : inSchema.getAllColumns()) {
 +        if (columnSet.contains(column)) {
 +          projected.addColumn(column);
 +        }
 +      }
 +
 +    } else {
 +      // no any projected columns, meaning that all columns should be projected.
 +      // TODO - this implicit rule makes code readability bad. So, we should remove it later
 +      projected = outSchema;
 +    }
 +
 +    initScanner(projected);
      super.init();
      progress = 0.0f;
 -    if (qual != null) {
 -      qual.bind(context.getEvalContext(), inSchema);
 +
 +    if (plan.hasQual()) {
 +      if (fileScanner.isProjectable()) {
 +        qual.bind(context.getEvalContext(), projected);
 +      } else {
 +        qual.bind(context.getEvalContext(), inSchema);
 +      }
 +    }
 +  }
 +
 +  private void initScanner(Schema projected) throws IOException {
 +
 +    TableMeta meta;
 +    try {
 +      meta = (TableMeta) plan.getTableDesc().getMeta().clone();
 +    } catch (CloneNotSupportedException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    // set system default properties
 +    PlannerUtil.applySystemDefaultToTableProperties(context.getQueryContext(), meta);
 +
 +    // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
 +    if (fragment != null) {
 +
 +      Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual);
 +
-       this.fileScanner = TableSpaceManager.getStorageManager(context.getConf(),
++      this.fileScanner = OldStorageManager.getStorageManager(context.getConf(),
 +          plan.getTableDesc().getMeta().getStoreType())
 +          .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema);
 +      this.fileScanner.init();
 +
 +      // See Scanner.isProjectable() method Depending on the result of isProjectable(),
 +      // the width of retrieved tuple is changed.
 +      //
 +      // If TRUE, the retrieved tuple will contain only projected fields.
 +      // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
 +      if (fileScanner.isProjectable()) {
 +        this.projector = new Projector(context, projected, outSchema, plan.getTargets());
 +      } else {
 +        this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 +      }
      }
    }
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 422d034,37b497c..91fce57
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@@ -43,7 -43,7 +43,6 @@@ import org.apache.tajo.ipc.ClientProtos
  import org.apache.tajo.master.TajoMaster.MasterContext;
  import org.apache.tajo.master.exec.DDLExecutor;
  import org.apache.tajo.master.exec.QueryExecutor;
--import org.apache.tajo.session.Session;
  import org.apache.tajo.plan.*;
  import org.apache.tajo.plan.logical.InsertNode;
  import org.apache.tajo.plan.logical.LogicalRootNode;
@@@ -53,10 -53,8 +52,10 @@@ import org.apache.tajo.plan.verifier.Lo
  import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
  import org.apache.tajo.plan.verifier.VerificationState;
  import org.apache.tajo.plan.verifier.VerifyException;
- import org.apache.tajo.storage.Tablespace;
- import org.apache.tajo.storage.TableSpaceManager;
++import org.apache.tajo.session.Session;
+ import org.apache.tajo.storage.TablespaceManager;
  import org.apache.tajo.util.CommonTestingUtil;
 +import org.apache.tajo.util.IPCUtil;
  
  import java.io.IOException;
  import java.sql.SQLException;
@@@ -95,9 -91,8 +92,9 @@@ public class GlobalEngine extends Abstr
      try  {
        analyzer = new SQLAnalyzer();
        preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
-       planner = new LogicalPlanner(context.getCatalog());
+       planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
 -      optimizer = new LogicalOptimizer(context.getConf());
 +      // Access path rewriter is enabled only in QueryMasterTask
 +      optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
        annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
      } catch (Throwable t) {
        LOG.error(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c6732b3,7dbe815..76bfb95
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@@ -835,12 -844,11 +835,11 @@@ public class TajoMasterClientService ex
          TableDesc desc;
          try {
            desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
-               meta.getStoreType(), schema,
-               meta, path, true, partitionDesc, false);
+               null, meta.getStoreType(), schema, meta, path.toUri(), true, partitionDesc, false);
          } catch (Exception e) {
            return TableResponse.newBuilder()
 -              .setResultCode(ResultCode.ERROR)
 -              .setErrorMessage(e.getMessage()).build();
 +              .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null))
 +              .build();
          }
  
          return TableResponse.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 862e6f7,5d42157..2e5584c
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -28,11 -28,10 +28,8 @@@ import org.apache.tajo.QueryId
  import org.apache.tajo.QueryIdFactory;
  import org.apache.tajo.SessionVars;
  import org.apache.tajo.TajoConstants;
--import org.apache.tajo.catalog.CatalogService;
- import org.apache.tajo.catalog.CatalogUtil;
--import org.apache.tajo.catalog.Schema;
--import org.apache.tajo.catalog.TableDesc;
 -import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.*;
 +import org.apache.tajo.catalog.exception.AlreadyExistsIndexException;
  import org.apache.tajo.catalog.proto.CatalogProtos;
  import org.apache.tajo.catalog.statistics.TableStats;
  import org.apache.tajo.common.TajoDataTypes;
@@@ -41,12 -40,13 +38,14 @@@ import org.apache.tajo.datum.DatumFacto
  import org.apache.tajo.engine.planner.global.GlobalPlanner;
  import org.apache.tajo.engine.planner.global.MasterPlan;
  import org.apache.tajo.engine.planner.physical.EvalExprExec;
- import org.apache.tajo.engine.planner.physical.StoreTableExec;
+ import org.apache.tajo.engine.planner.physical.InsertRowsExec;
  import org.apache.tajo.engine.query.QueryContext;
  import org.apache.tajo.ipc.ClientProtos;
 +import org.apache.tajo.ipc.ClientProtos.ResultCode;
  import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
- import org.apache.tajo.master.*;
+ import org.apache.tajo.master.QueryInfo;
+ import org.apache.tajo.master.QueryManager;
+ import org.apache.tajo.master.TajoMaster;
  import org.apache.tajo.master.exec.prehook.CreateTableHook;
  import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
  import org.apache.tajo.master.exec.prehook.InsertIntoHook;
@@@ -65,8 -61,8 +60,9 @@@ import org.apache.tajo.plan.function.py
  import org.apache.tajo.plan.logical.*;
  import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.session.Session;
  import org.apache.tajo.storage.*;
 +import org.apache.tajo.util.IPCUtil;
  import org.apache.tajo.util.ProtoUtil;
  import org.apache.tajo.worker.TaskAttemptContext;
  
@@@ -422,42 -384,89 +391,89 @@@ public class QueryExecutor 
          fs.rename(eachFile.getPath(), targetFilePath);
        }
      }
+   }
  
-     if (insertNode.hasTargetTable()) {
-       TableStats stats = tableDesc.getStats();
-       long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-       stats.setNumBytes(volume);
-       stats.setNumRows(1);
- 
-       CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
-       builder.setTableName(tableDesc.getName());
-       builder.setStats(stats.getProto());
+   /**
+    * Insert row values
+    */
+   private void insertRowValues(QueryContext queryContext,
+                                InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
+     try {
+       String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
+           insertNode.getTableName();
+       String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+ 
+       URI finalOutputUri = insertNode.getUri();
+       Tablespace space = TablespaceManager.get(finalOutputUri).get();
+       TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
+       tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());
+ 
+       FormatProperty formatProperty = space.getFormatProperty(tableMeta);
+ 
+       TaskAttemptContext taskAttemptContext;
+       if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
+         taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
+         taskAttemptContext.setOutputPath(new Path(finalOutputUri));
+ 
+         EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+         InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
+ 
+         try {
+           exec.init();
+           exec.next();
+         } finally {
+           exec.close();
+         }
+       } else {
+         URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
+         Path stagingDir = new Path(stagingSpaceUri);
+         Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
  
-       catalog.updateTableStats(builder.build());
+         taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+         taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+         insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
+       }
  
-       responseBuilder.setTableDesc(tableDesc.getProto());
-     } else {
+       // set insert stats (how many rows and bytes)
        TableStats stats = new TableStats();
-       long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-       stats.setNumBytes(volume);
-       stats.setNumRows(1);
- 
-       // Empty TableDesc
-       List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
-       CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
-           .setTableName(nodeUniqName)
-           .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
-           .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
-           .setStats(stats.getProto())
-           .build();
- 
-       responseBuilder.setTableDesc(tableDescProto);
-     }
+       stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
+       stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());
+ 
+       if (insertNode.hasTargetTable()) {
+         CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+         builder.setTableName(insertNode.getTableName());
+         builder.setStats(stats.getProto());
+ 
+         catalog.updateTableStats(builder.build());
+ 
+         TableDesc desc = new TableDesc(
+             insertNode.getTableName(),
+             insertNode.getTargetSchema(),
+             tableMeta,
+             finalOutputUri);
+         responseBuilder.setTableDesc(desc.getProto());
+ 
+       } else { // If INSERT INTO LOCATION
+ 
+         // Empty TableDesc
+         List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+         CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+             .setTableName(nodeUniqName)
+             .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
+             .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+             .setStats(stats.getProto())
+             .build();
+ 
+         responseBuilder.setTableDesc(tableDescProto);
+       }
  
-     // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
-     responseBuilder.setMaxRowNum(-1);
-     responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-     responseBuilder.setResult(IPCUtil.buildOkRequestResult());
+       // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+       responseBuilder.setMaxRowNum(-1);
+       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
 -      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
++      responseBuilder.setResult(IPCUtil.buildOkRequestResult());
+     } catch (Throwable t) {
+       throw new RuntimeException(t);
+     }
    }
  
    public void executeDistributedQuery(QueryContext queryContext, Session session,
@@@ -502,47 -515,15 +522,34 @@@
      }
    }
  
 +  private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
 +      throws IOException {
 +    String databaseName, simpleIndexName, qualifiedIndexName;
 +    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
 +      String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
 +      databaseName = splits[0];
 +      simpleIndexName = splits[1];
 +      qualifiedIndexName = createIndexNode.getIndexName();
 +    } else {
 +      databaseName = queryContext.getCurrentDatabase();
 +      simpleIndexName = createIndexNode.getIndexName();
 +      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
 +    }
 +
 +    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
 +      throw new AlreadyExistsIndexException(qualifiedIndexName);
 +    }
 +  }
 +
-   public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+   public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
        throws Exception {
  
-     String storeType = PlannerUtil.getStoreType(plan);
-     if (storeType != null) {
-       Tablespace sm = TableSpaceManager.getStorageManager(planner.getConf(), storeType);
-       StorageProperty storageProperty = sm.getStorageProperty();
-       if (storageProperty.isSortedInsert()) {
-         String tableName = PlannerUtil.getStoreTableName(plan);
-         LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-         TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
-         if (tableDesc == null) {
-           throw new VerifyException("Can't get table meta data from catalog: " + tableName);
-         }
-         List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
-             context, tableDesc);
-         if (storageSpecifiedRewriteRules != null) {
-           for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
-             eachRule.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
-           }
-         }
-       }
+     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+     TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+ 
+     if (tableDesc != null) {
+       Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
+       space.rewritePlan(context, plan);
      }
  
      MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index bb83b8c,9d5838d..23fa497
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@@ -30,10 -30,9 +30,11 @@@ import org.apache.hadoop.yarn.state.*
  import org.apache.hadoop.yarn.util.Clock;
  import org.apache.tajo.ExecutionBlockId;
  import org.apache.tajo.QueryId;
+ import org.apache.tajo.QueryVars;
  import org.apache.tajo.SessionVars;
  import org.apache.tajo.TajoProtos.QueryState;
 +import org.apache.tajo.catalog.*;
 +import org.apache.tajo.catalog.exception.CatalogException;
  import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
  import org.apache.tajo.catalog.CatalogService;
  import org.apache.tajo.catalog.TableDesc;

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0737422,1f5e7a3..550b1ee
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -50,15 -40,16 +40,18 @@@ import org.apache.tajo.ipc.TajoWorkerPr
  import org.apache.tajo.master.TajoContainerProxy;
  import org.apache.tajo.master.event.*;
  import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
  import org.apache.tajo.plan.logical.LogicalNode;
+ import org.apache.tajo.plan.logical.LogicalRootNode;
  import org.apache.tajo.plan.logical.NodeType;
  import org.apache.tajo.plan.logical.ScanNode;
- import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.session.Session;
 -import org.apache.tajo.storage.*;
++import org.apache.tajo.storage.FormatProperty;
 +import org.apache.tajo.storage.Tablespace;
- import org.apache.tajo.storage.StorageProperty;
- import org.apache.tajo.storage.StorageUtil;
- import org.apache.tajo.storage.TableSpaceManager;
++import org.apache.tajo.storage.TablespaceManager;
  import org.apache.tajo.util.metrics.TajoMetrics;
  import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
  import org.apache.tajo.worker.AbstractResourceAllocator;
@@@ -315,36 -300,22 +302,21 @@@ public class QueryMasterTask extends Co
          LOG.warn("Query already started");
          return;
        }
 -
 -
 +      LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
        CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
-       LogicalPlanner planner = new LogicalPlanner(catalog);
+       LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
 +      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
        Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
        jsonExpr = null; // remove the possible OOM
+ 
        plan = planner.createPlan(queryContext, expr);
+       optimizer.optimize(queryContext, plan);
  
-       String storeType = PlannerUtil.getStoreType(plan);
-       if (storeType != null) {
-         sm = TableSpaceManager.getStorageManager(systemConf, storeType);
-         StorageProperty storageProperty = sm.getStorageProperty();
-         if (storageProperty.isSortedInsert()) {
-           String tableName = PlannerUtil.getStoreTableName(plan);
-           LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-           TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
-           if (tableDesc == null) {
-             throw new VerifyException("Can't get table meta data from catalog: " + tableName);
-           }
-           List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
-               getQueryTaskContext().getQueryContext(), tableDesc);
-           if (storageSpecifiedRewriteRules != null) {
-             for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
-               optimizer.addRuleAfterToJoinOpt(eachRule);
-             }
-           }
-         }
-       }
+       // when a given uri is null, TablespaceManager.get will return the default tablespace.
+       space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+       space.rewritePlan(queryContext, plan);
  
-       optimizer.optimize(queryContext, plan);
+       initStagingDir();
  
        for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
          LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 9118e73,07a09ad..2c336a4
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@@ -44,7 -42,8 +43,6 @@@ import org.apache.tajo.plan.expr.EvalCo
  import org.apache.tajo.plan.expr.EvalNode;
  import org.apache.tajo.plan.serder.EvalNodeDeserializer;
  import org.apache.tajo.plan.serder.EvalNodeSerializer;
 -import org.apache.tajo.engine.query.QueryContext;
--import org.apache.tajo.catalog.SchemaUtil;
  import org.apache.tajo.plan.serder.PlanProto;
  import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
  import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
@@@ -66,9 -66,9 +65,7 @@@ import java.util.TimeZone
  
  import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
  import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
--import static org.junit.Assert.assertEquals;
--import static org.junit.Assert.assertFalse;
--import static org.junit.Assert.fail;
++import static org.junit.Assert.*;
  
  public class ExprTestBase {
    private static TajoTestingCluster util;
@@@ -103,8 -103,8 +100,8 @@@
  
      analyzer = new SQLAnalyzer();
      preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
-     planner = new LogicalPlanner(cat);
+     planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(util.getConfiguration());
 +    optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
      annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
    }
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 133dec1,afa3472..8042aef
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@@ -103,10 -103,10 +103,11 @@@ public class TestLogicalOptimizer 
  
      catalog.createFunction(funcDesc);
      sqlAnalyzer = new SQLAnalyzer();
-     planner = new LogicalPlanner(catalog);
+     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(util.getConfiguration());
++    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
  
      defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
 +    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
    }
  
    @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index dbd6e6d,0f37763..0cd5ced
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@@ -881,31 -889,8 +889,31 @@@ public class TestLogicalPlanner 
    }
  
    @Test
 +  public final void testCreateIndexNode() throws PlanningException {
 +    QueryContext qc = new QueryContext(util.getConfiguration(), session);
 +    Expr expr = sqlAnalyzer.parse(QUERIES[11]);
 +    LogicalPlan rootNode = planner.createPlan(qc, expr);
 +    LogicalNode plan = rootNode.getRootBlock().getRoot();
 +    testJsonSerDerObject(plan);
 +
 +    LogicalRootNode root = (LogicalRootNode) plan;
 +    assertEquals(NodeType.CREATE_INDEX, root.getChild().getType());
 +    CreateIndexNode createIndexNode = root.getChild();
 +
 +    assertEquals(NodeType.PROJECTION, createIndexNode.getChild().getType());
 +    ProjectionNode projNode = createIndexNode.getChild();
 +
 +    assertEquals(NodeType.SELECTION, projNode.getChild().getType());
 +    SelectionNode selNode = projNode.getChild();
 +
 +    assertEquals(NodeType.SCAN, selNode.getChild().getType());
 +    ScanNode scanNode = selNode.getChild();
 +    assertEquals(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), scanNode.getTableName());
 +  }
 +
 +  @Test
    public final void testAsterisk() throws CloneNotSupportedException, PlanningException {
-     QueryContext qc = new QueryContext(util.getConfiguration(), session);
+     QueryContext qc = createQueryContext();
  
      Expr expr = sqlAnalyzer.parse(QUERIES[13]);
      LogicalPlan planNode = planner.createPlan(qc, expr);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 2178d5c,1b64a8f..0684ff6
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@@ -28,14 -28,14 +28,15 @@@ import org.apache.tajo.conf.TajoConf
  import org.apache.tajo.datum.Datum;
  import org.apache.tajo.datum.DatumFactory;
  import org.apache.tajo.engine.parser.SQLAnalyzer;
--import org.apache.tajo.engine.planner.*;
++import org.apache.tajo.engine.planner.PhysicalPlanner;
++import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
  import org.apache.tajo.engine.planner.enforce.Enforcer;
++import org.apache.tajo.engine.query.QueryContext;
  import org.apache.tajo.plan.LogicalOptimizer;
  import org.apache.tajo.plan.LogicalPlan;
  import org.apache.tajo.plan.LogicalPlanner;
  import org.apache.tajo.plan.PlanningException;
  import org.apache.tajo.plan.logical.LogicalNode;
--import org.apache.tajo.engine.query.QueryContext;
  import org.apache.tajo.storage.*;
  import org.apache.tajo.storage.fragment.FileFragment;
  import org.apache.tajo.util.CommonTestingUtil;
@@@ -130,8 -128,8 +131,8 @@@ public class TestHashAntiJoinExec 
      people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
      catalog.createTable(people);
      analyzer = new SQLAnalyzer();
-     planner = new LogicalPlanner(catalog);
+     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(conf);
 +    optimizer = new LogicalOptimizer(conf, catalog);
    }
  
    @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 697624a,afa273b..cbf0739
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@@ -135,8 -133,8 +135,8 @@@ public class TestHashSemiJoinExec 
      people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
      catalog.createTable(people);
      analyzer = new SQLAnalyzer();
-     planner = new LogicalPlanner(catalog);
+     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(conf);
 +    optimizer = new LogicalOptimizer(conf, catalog);
    }
  
    @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8fe03be,dff0cbe..cdfb8cc
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@@ -162,13 -162,12 +162,12 @@@ public class TestPhysicalPlanner 
      }
      appender.flush();
      appender.close();
 +
 +    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
      catalog.createTable(score);
      analyzer = new SQLAnalyzer();
-     planner = new LogicalPlanner(catalog);
+     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(conf);
 -
 -    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
 +    optimizer = new LogicalOptimizer(conf, catalog);
- 
      masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
  
      createLargeScoreTable();
@@@ -971,19 -970,14 +970,19 @@@
    }
  
    public final String [] createIndexStmt = {
 -      "create index idx_employee on employee using bst (name null first, empId desc)"
 +      "create index idx_employee on employee using TWO_LEVEL_BIN_TREE (name null first, empId desc)"
    };
  
 -  //@Test
 +  @Test
    public final void testCreateIndex() throws IOException, PlanningException {
      FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
-         new Path(employee.getPath()), Integer.MAX_VALUE);
+         new Path(employee.getUri()), Integer.MAX_VALUE);
      Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex");
 +    Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee");
 +    if (sm.getFileSystem().exists(indexPath)) {
 +      sm.getFileSystem().delete(indexPath, true);
 +    }
 +
      TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
          LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
          new FileFragment[] {frags[0]}, workDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index b54b4d9,4690e71..a19ac34
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@@ -30,11 -30,11 +30,17 @@@ import org.apache.tajo.conf.TajoConf
  import org.apache.tajo.datum.Datum;
  import org.apache.tajo.datum.DatumFactory;
  import org.apache.tajo.engine.parser.SQLAnalyzer;
--import org.apache.tajo.engine.planner.*;
++import org.apache.tajo.engine.planner.PhysicalPlanner;
++import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
++import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
++import org.apache.tajo.engine.planner.UniformRangePartition;
  import org.apache.tajo.engine.planner.enforce.Enforcer;
--import org.apache.tajo.plan.*;
--import org.apache.tajo.plan.logical.LogicalNode;
  import org.apache.tajo.engine.query.QueryContext;
++import org.apache.tajo.plan.LogicalOptimizer;
++import org.apache.tajo.plan.LogicalPlan;
++import org.apache.tajo.plan.LogicalPlanner;
++import org.apache.tajo.plan.PlanningException;
++import org.apache.tajo.plan.logical.LogicalNode;
  import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.storage.*;
  import org.apache.tajo.storage.fragment.FileFragment;
@@@ -101,10 -100,9 +107,10 @@@ public class TestSortExec 
          tablePath.toUri());
      catalog.createTable(desc);
  
 +    queryContext = new QueryContext(conf);
      analyzer = new SQLAnalyzer();
-     planner = new LogicalPlanner(catalog);
+     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(conf);
 +    optimizer = new LogicalOptimizer(conf, catalog);
    }
  
    public static String[] QUERIES = {

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index af093a8,7c61cc7..e5b8470
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@@ -80,10 -79,9 +79,9 @@@ public class TestExecutionBlockCursor 
      }
  
      analyzer = new SQLAnalyzer();
-     logicalPlanner = new LogicalPlanner(catalog);
+     logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    optimizer = new LogicalOptimizer(conf);
 +    optimizer = new LogicalOptimizer(conf, catalog);
  
-     Tablespace sm  = TableSpaceManager.getFileStorageManager(conf);
      dispatcher = new AsyncDispatcher();
      dispatcher.init(conf);
      dispatcher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 10c6836,1351716..e7f51a8
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -37,7 -37,7 +37,10 @@@ import org.apache.tajo.engine.query.Tas
  import org.apache.tajo.ipc.QueryCoordinatorProtocol;
  import org.apache.tajo.ipc.TajoWorkerProtocol;
  import org.apache.tajo.master.cluster.WorkerConnectionInfo;
--import org.apache.tajo.master.event.*;
++import org.apache.tajo.master.event.QueryEvent;
++import org.apache.tajo.master.event.QueryEventType;
++import org.apache.tajo.master.event.StageEvent;
++import org.apache.tajo.master.event.StageEventType;
  import org.apache.tajo.plan.LogicalOptimizer;
  import org.apache.tajo.plan.LogicalPlan;
  import org.apache.tajo.plan.LogicalPlanner;
@@@ -104,8 -105,8 +108,8 @@@ public class TestKillQuery 
      Session session = LocalTajoTestingUtility.createDummySession();
      CatalogService catalog = cluster.getMaster().getCatalog();
  
-     LogicalPlanner planner = new LogicalPlanner(catalog);
+     LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
 +    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
      Expr expr =  analyzer.parse(queryStr);
      LogicalPlan plan = planner.createPlan(defaultContext, expr);
  
@@@ -168,8 -169,8 +172,8 @@@
      Session session = LocalTajoTestingUtility.createDummySession();
      CatalogService catalog = cluster.getMaster().getCatalog();
  
-     LogicalPlanner planner = new LogicalPlanner(catalog);
+     LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 -    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
 +    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
      Expr expr =  analyzer.parse(queryStr);
      LogicalPlan plan = planner.createPlan(defaultContext, expr);
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 48b74c0,a2480c9..d32045f
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@@ -49,7 -48,9 +50,8 @@@ import org.apache.tajo.plan.nameresolve
  import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
  import org.apache.tajo.plan.util.ExprFinder;
  import org.apache.tajo.plan.util.PlannerUtil;
 -import org.apache.tajo.catalog.SchemaUtil;
  import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.storage.StorageService;
  import org.apache.tajo.util.KeyValueSet;
  import org.apache.tajo.util.Pair;
  import org.apache.tajo.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
index 0000000,ef33a8e..d12c6bd
mode 000000,100644..100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@@ -1,0 -1,250 +1,234 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.storage;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Maps;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.tajo.TaskAttemptId;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.storage.fragment.Fragment;
+ 
+ import java.io.IOException;
+ import java.lang.reflect.Constructor;
+ import java.net.URI;
+ import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ 
+ /**
+  * It handles available table spaces and cache TableSpace instances.
+  */
+ public class OldStorageManager {
+   private static final Log LOG = LogFactory.getLog(OldStorageManager.class);
+ 
+   /**
+    * Cache of scanner handlers for each storage type.
+    */
+   protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+       = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+   /**
+    * Cache of appender handlers for each storage type.
+    */
+   protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+       = new ConcurrentHashMap<String, Class<? extends Appender>>();
+   private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+       Configuration.class,
+       Schema.class,
+       TableMeta.class,
+       Fragment.class
+   };
+   private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+       Configuration.class,
+       TaskAttemptId.class,
+       Schema.class,
+       TableMeta.class,
+       Path.class
+   };
+   /**
+    * Cache of Tablespace.
+    * Key is manager key(warehouse path) + store type
+    */
+   private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
+   /**
+    * Cache of constructors for each class. Pins the classes so they
+    * can't be garbage collected until ReflectionUtils can be collected.
+    */
+   protected static Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ 
+   /**
+    * Clear all class cache
+    */
+   @VisibleForTesting
+   protected synchronized static void clearCache() {
+     CONSTRUCTOR_CACHE.clear();
+     SCANNER_HANDLER_CACHE.clear();
+     APPENDER_HANDLER_CACHE.clear();
+     storageManagers.clear();
+   }
+ 
+   /**
+    * Close Tablespace
+    * @throws java.io.IOException
+    */
+   public static void shutdown() throws IOException {
+     synchronized(storageManagers) {
+       for (Tablespace eachTablespace : storageManagers.values()) {
+         eachTablespace.close();
+       }
+     }
+     clearCache();
+   }
+ 
+   /**
+    * Returns the proper Tablespace instance according to the storeType.
+    *
+    * @param tajoConf Tajo system property.
+    * @param storeType Storage type
+    * @return
+    * @throws IOException
+    */
+   public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+     FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+     if (fileSystem != null) {
+       return getStorageManager(tajoConf, fileSystem.getUri(), storeType);
+     } else {
+       return getStorageManager(tajoConf, null, storeType);
+     }
+   }
+ 
+   /**
+    * Returns the proper Tablespace instance according to the storeType
+    *
+    * @param tajoConf Tajo system property.
+    * @param uri Key that can identify each storage manager(may be a path)
+    * @param storeType Storage type
+    * @return
+    * @throws IOException
+    */
+   public static synchronized Tablespace getStorageManager(
+       TajoConf tajoConf, URI uri, String storeType) throws IOException {
+     Preconditions.checkNotNull(tajoConf);
+     Preconditions.checkNotNull(uri);
+     Preconditions.checkNotNull(storeType);
+ 
+     String typeName;
+     if (storeType.equalsIgnoreCase("HBASE")) {
+       typeName = "hbase";
+     } else {
+       typeName = "hdfs";
+     }
+ 
+     synchronized (storageManagers) {
+       String storeKey = typeName + "_" + uri.toString();
+       Tablespace manager = storageManagers.get(storeKey);
+ 
+       if (manager == null) {
+         Class<? extends Tablespace> storageManagerClass =
+             tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+ 
+         if (storageManagerClass == null) {
+           throw new IOException("Unknown Storage Type: " + typeName);
+         }
+ 
+         try {
+           Constructor<? extends Tablespace> constructor =
+               (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+           if (constructor == null) {
+             constructor = storageManagerClass.getDeclaredConstructor(TablespaceManager.TABLESPACE_PARAM);
+             constructor.setAccessible(true);
+             CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+           }
+           manager = constructor.newInstance(new Object[]{"noname", uri});
+         } catch (Exception e) {
+           throw new RuntimeException(e);
+         }
+         manager.init(tajoConf);
+         storageManagers.put(storeKey, manager);
+       }
+ 
+       return manager;
+     }
+   }
+ 
+   /**
 -   * Returns Scanner instance.
 -   *
 -   * @param conf The system property
 -   * @param meta The table meta
 -   * @param schema The input schema
 -   * @param fragment The fragment for scanning
 -   * @param target The output schema
 -   * @return Scanner instance
 -   * @throws IOException
 -   */
 -  public static synchronized SeekableScanner getSeekableScanner(
 -      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
 -    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
 -  }
 -
 -  /**
+    * Creates a scanner instance.
+    *
+    * @param theClass Concrete class of scanner
+    * @param conf System property
+    * @param schema Input schema
+    * @param meta Table meta data
+    * @param fragment The fragment for scanning
+    * @param <T>
+    * @return The scanner instance
+    */
+   public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+                                          Fragment fragment) {
+     T result;
+     try {
+       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+       if (meth == null) {
+         meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+         meth.setAccessible(true);
+         CONSTRUCTOR_CACHE.put(theClass, meth);
+       }
+       result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+     } catch (Exception e) {
+       throw new RuntimeException(e);
+     }
+ 
+     return result;
+   }
+ 
+   /**
+    * Creates a scanner instance.
+    *
+    * @param theClass Concrete class of scanner
+    * @param conf System property
+    * @param taskAttemptId Task id
+    * @param meta Table meta data
+    * @param schema Input schema
+    * @param workDir Working directory
+    * @param <T>
+    * @return The scanner instance
+    */
+   public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+                                           TableMeta meta, Schema schema, Path workDir) {
+     T result;
+     try {
+       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+       if (meth == null) {
+         meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+         meth.setAccessible(true);
+         CONSTRUCTOR_CACHE.put(theClass, meth);
+       }
+       result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+     } catch (Exception e) {
+       throw new RuntimeException(e);
+     }
+ 
+     return result;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index c89f043,52e223d..0995e91
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@@ -245,22 -261,15 +261,30 @@@ public abstract class Tablespace 
      return scanner;
    }
  
+   public Appender getAppenderForInsertRow(OverridableConf queryContext,
+                                           TaskAttemptId taskAttemptId,
+                                           TableMeta meta,
+                                           Schema schema,
+                                           Path workDir) throws IOException {
+     return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
+   }
+ 
    /**
 +   * Returns Scanner instance.
 +   *
 +   * @param meta The table meta
 +   * @param schema The input schema
 +   * @param fragment The fragment for scanning
 +   * @param target The output schema
 +   * @return Scanner instance
 +   * @throws IOException
 +   */
 +  public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
 +                                                         Schema target) throws IOException {
 +    return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
 +  }
 +
 +  /**
     * Returns Appender instance.
     * @param queryContext Query property.
     * @param taskAttemptId Task id.

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 845c2d7,18bb7ed..bcb02c6
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@@ -49,13 -49,9 +49,10 @@@ import org.apache.tajo.plan.logical.Cre
  import org.apache.tajo.plan.logical.LogicalNode;
  import org.apache.tajo.plan.logical.NodeType;
  import org.apache.tajo.plan.logical.ScanNode;
- import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
  import org.apache.tajo.storage.*;
  import org.apache.tajo.storage.fragment.Fragment;
- import org.apache.tajo.util.Bytes;
- import org.apache.tajo.util.BytesUtils;
- import org.apache.tajo.util.Pair;
- import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.util.*;
  
  import java.io.BufferedReader;
  import java.io.IOException;
@@@ -1062,13 -1089,10 +1090,10 @@@ public class HBaseTablespace extends Ta
      }
    }
  
-   public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
-     if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
-       List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>();
-       rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
-       return rules;
-     } else {
-       return null;
+   @Override
+   public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
 -    if (REWRITE_RULE.isEligible(context, plan)) {
 -      REWRITE_RULE.rewrite(context, plan);
++    if (REWRITE_RULE.isEligible(new LogicalPlanRewriteRuleContext(context, plan))) {
++      REWRITE_RULE.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
      }
    }
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
index 0000000,ebf557e..b6143f3
mode 000000,100644..100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
@@@ -1,0 -1,116 +1,117 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.storage.hbase;
+ 
 -import org.apache.tajo.OverridableConf;
+ import org.apache.tajo.catalog.Column;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.SortSpec;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.PlanningException;
+ import org.apache.tajo.plan.logical.*;
+ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
+ import org.apache.tajo.util.KeyValueSet;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ /**
+  * This rewrite rule injects a sort operation to preserve the writing rows in
+  * an ascending order of HBase row keys, required by HFile.
+  */
+ public class SortedInsertRewriter implements LogicalPlanRewriteRule {
+ 
+   @Override
+   public String getName() {
+     return "SortedInsertRewriter";
+   }
+ 
+   @Override
 -  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
 -    boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
 -    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
++  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
++    boolean hbaseMode = "false".equalsIgnoreCase(context.getQueryContext().get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
++    LogicalRootNode rootNode = context.getPlan().getRootBlock().getRoot();
+     LogicalNode node = rootNode.getChild();
+     return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT;
+   }
+ 
+   public static Column[] getIndexColumns(Schema tableSchema, KeyValueSet tableProperty) throws IOException {
+     List<Column> indexColumns = new ArrayList<Column>();
+ 
+     ColumnMapping columnMapping = new ColumnMapping(tableSchema, tableProperty);
+ 
+     boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+     for (int i = 0; i < isRowKeys.length; i++) {
+       if (isRowKeys[i]) {
+         indexColumns.add(tableSchema.getColumn(i));
+       }
+     }
+ 
+     return indexColumns.toArray(new Column[]{});
+   }
+ 
+   @Override
 -  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
++  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws PlanningException {
++    LogicalPlan plan = context.getPlan();
+     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ 
+     StoreTableNode storeTable = rootNode.getChild();
+     Schema tableSchema = storeTable.getTableSchema();
+ 
+     Column[] sortColumns;
+     try {
+       sortColumns = getIndexColumns(tableSchema, storeTable.getOptions());
+     } catch (IOException e) {
+       throw new PlanningException(e);
+     }
+ 
+     int[] sortColumnIndexes = new int[sortColumns.length];
+     for (int i = 0; i < sortColumns.length; i++) {
+       sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+     }
+ 
+     UnaryNode insertNode = rootNode.getChild();
+     LogicalNode childNode = insertNode.getChild();
+ 
+     Schema sortSchema = childNode.getOutSchema();
+     SortNode sortNode = plan.createNode(SortNode.class);
+     sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+     sortNode.setInSchema(sortSchema);
+     sortNode.setOutSchema(sortSchema);
+ 
+     SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+     int index = 0;
+ 
+     for (int i = 0; i < sortColumnIndexes.length; i++) {
+       Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+       if (sortColumn == null) {
+         throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+       }
+       sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+     }
+     sortNode.setSortSpecs(sortSpecs);
+ 
+     sortNode.setChild(insertNode.getChild());
+     insertNode.setChild(sortNode);
+     plan.getRootBlock().registerNode(sortNode);
+ 
+     return plan;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 2fab8db,22fb607..f0a302a
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@@ -124,8 -125,7 +125,8 @@@ public class TestBSTIndex 
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -148,8 -148,7 +149,8 @@@
      tuple = new VTuple(keySchema.size());
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
      reader.open();
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@@ -228,8 -227,7 +229,8 @@@
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
          keySchema, comp);
      reader.open();
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@@ -292,8 -290,7 +293,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -364,8 -362,7 +366,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -388,8 -385,7 +390,8 @@@
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
          keySchema, comp);
      reader.open();
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple result;
@@@ -457,8 -453,7 +459,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -481,8 -476,7 +483,8 @@@
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
          keySchema, comp);
      reader.open();
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple result;
@@@ -538,8 -533,7 +541,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -564,8 -558,7 +567,8 @@@
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
          keySchema, comp);
      reader.open();
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      tuple.put(0, DatumFactory.createInt8(0));
@@@ -623,8 -617,7 +627,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -729,8 -723,7 +734,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -811,8 -805,7 +817,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -837,8 -830,7 +843,8 @@@
      BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
          keySchema, comp);
      reader.open();
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@@ -903,8 -895,7 +909,8 @@@
      creater.setLoadNum(LOAD_NUM);
      creater.open();
  
-     SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple keyTuple;
@@@ -932,8 -923,7 +938,8 @@@
      assertEquals(keySchema, reader.getKeySchema());
      assertEquals(comp, reader.getComparator());
  
-     scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
 -    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
 +        getSeekableScanner(meta, schema, tablet.getProto(), schema);
      scanner.init();
  
      Tuple result;