You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/13 05:29:58 UTC

[3/4] TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
new file mode 100644
index 0000000..628e7bc
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<style type="text/css" >
+     
+     h5 {}
+     h1 {
+      font-size:25pt;
+      font-weight:bold;
+     }
+     h3 {
+      margin-top:5px;
+      margin-bottom:5px;
+      margin-left: 4px;
+      font-size:15pt;
+      font-weight:bold;
+     }
+     h2 {
+      margin-top:5px;
+      margin-bottom:5px;
+      margin-left:4px;
+      font-size:18pt;
+      font-weight:bold;
+     }
+     h2.line {
+      margin-top:45px;
+      margin-bottom:15px;
+      margin-left:4px;
+      color:#333333;
+      font-size:18pt;
+      font-weight:bold;
+      border-bottom:1px solid #999999;
+     }
+     h2.compactline{
+      margin-top:5px;
+      margin-bottom:5px;
+      margin-left:4px;
+      margin-right:4px;
+      color:#333333;
+      font-size:18pt;
+      font-weight:bold;
+      border-bottom:1px solid #999999;
+     }
+     td {
+      border: 1px solid #999999;
+      padding-left : 15px;
+      padding-top:2px;
+      padding-bottom:2px; 
+      margin : 0px;
+      word-wrap: break-word;
+     }
+     td.long {
+      width:450px;
+      border: 1px solid #999999;
+      padding : 2px; 
+      margin : 0px;
+      text-align:center;
+     }
+     th {
+       border: 1px solid #777777;
+       font-weight:bold;
+       color:#333333;
+       background-color:#cccccc;
+       text-align:left;
+       padding-left:15px;
+       padding-top:3px;
+       padding-bottom:3px;
+     }
+     th.small {
+       font-weight:bold;
+       width:100px;
+     }
+     table {
+      border-collapse:collapse;
+     }
+     table.new {
+      border-collapse:collapse;
+      width:95%;
+      border:1px solid #999999;
+      padding:5px;
+      table-layout:fixed;
+     } 
+     table.noborder {
+      border-collapse:collapse;
+      width:98%;
+      border:none;
+      margin-top:5px;
+     }
+     td.rightborder{
+      padding-left:0px;
+      border-right:1px solid #cccccc;
+      border-top:none;
+      border-left:none;
+      border-bottom:none;
+      text-align:center;
+     }
+     td.noborder{
+      padding-left:0px;
+      border:none;
+      text-align:center;
+     }
+     th.rightbottom{
+      padding-left:0px;
+      border-right:1px solid #cccccc;
+      border-bottom:1px solid #cccccc;
+      border-top:none;
+      border-left:none;
+      background-color:#ffffff;
+      text-align:center;
+     }
+     th.bottom{
+      padding-left:0px;
+      border-right:none;
+      border-bottom:1px solid #cccccc;
+      border-top:none;
+      border-left:none;
+      background-color:#ffffff;
+      text-align:center;
+     }
+     iframe {
+       width:1024px;
+       overflow:hidden;
+       border:0px;
+       padding:0px;
+     }
+     div.tajoinfo {
+       width :350px;
+       height:45px;
+       border:1px solid black;
+       margin-top:80px;
+       margin-bottom:2px;
+       margin-left:80px;
+       margin-right:2px;
+       float:left;
+     }
+     div.tajoimage {
+       width:450px;
+       height:125px;
+       margin:3px;
+       float:left;
+     }
+     div.container {
+       width:860px;
+       margin:auto;
+       overflow:auto;
+     }
+     .topcontainer {
+       border:1px solid green;
+       margin:auto;
+       min-height:400px;
+       overflow:auto;
+     }
+     div.leftbox {
+       width:450px;
+       margin:6px;
+       padding:5px;
+       float:left;
+     }
+     div.bottombox {
+       margin:5px;
+       padding:5px;
+     }
+     div.leftcontent {
+       border:1px solid black;
+       margin:auto;
+       min-height:400px;
+     }
+     div.titlebox {
+       width:inherit;
+       border:2px solid #999999;
+     }
+     div.contentbox {
+       padding-top:5px;
+       padding-bottom:5px;
+       width:inherit;
+       border-left:2px solid #999999;
+       border-right:2px solid #999999;
+       border-bottom:1px solid #999999;
+       border-top:1px solid #999999;
+       overflow:hidden;
+       word-wrap:break-word;
+     }
+     div#tajotitle {
+       height:40px;
+       border:1px solid black;
+       margin-top:5px;
+       margin-bottom:5px;
+     }
+     div.jobbox {
+       width:300px;
+       margin:6px;
+       padding:5px;
+       float:left;
+     }
+     div#bottomtabletitle {
+       width:700px;
+       border:2px solid #999999;
+     }
+     div.center {
+       margin-top:10px;
+       text-align:center;
+     }
+     div.headline {
+       background-color:#999999;
+       padding:3px;
+       text-align:center;
+     }
+     div.headline_2 {
+       background-color:#999999;
+       padding:3px;
+     }
+     div.command {
+       margin:auto;
+       width:600px;
+       height:320px;
+       text-align:center;
+     }
+     textarea.command {
+       margin:3px;
+       width:550px;
+       height:310px;
+     }
+     hr {
+       border:1px solid #999999;
+     }
+     a.headline {
+       color:#ffffff;
+       font-size:13pt;
+       font-weight:bold;
+       text-decoration:none;
+     }
+     a.tablelink {
+       color:#000000;
+       text-decoration:none;
+     }
+     a.tablelink:hover {
+       color:#666666;
+       text-decoration:none;
+     }
+     font.maintitle {
+       margin-left:35px;
+       font-size:35pt;
+       font-weight:bold;
+     }
+     font.subtitle {
+       margin-left:35px;
+       font-size:25pt;
+       font-weight:bold;
+     }
+     ul li.off {
+      color:#ee4444;
+     }
+     ul li.on {
+      color:#33dd33;
+     }
+     div.outbox {
+      margin-top:5px;
+      padding-bottom:5px;
+      border:1px solid #cccccc;
+     }
+     div.outbox_order {
+      margin-top:7px;
+      padding-bottom:5px;
+      border:1px solid #cccccc;
+      width:425px;
+      height:auto;
+      float:left;
+     }
+
+  </style>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 5a85a07..d1efc1c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,21 +21,19 @@
  */
 package org.apache.tajo;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.IOException;
-import java.util.UUID;
 
 public class BackendTestingUtil {
 	public final static Schema mockupSchema;
@@ -51,7 +49,7 @@ public class BackendTestingUtil {
   public static void writeTmpTable(TajoConf conf, Path path,
                                    String tableName, boolean writeMeta)
       throws IOException {
-    StorageManager sm = StorageManager.get(conf, path);
+    AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, path);
     FileSystem fs = sm.getFileSystem();
     Appender appender;
 
@@ -64,7 +62,7 @@ public class BackendTestingUtil {
     if (writeMeta) {
       FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
     }
-    appender = StorageManager.getAppender(conf, mockupMeta, tablePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(mockupMeta, tablePath);
     appender.init();
 
     int deptSize = 10000;
@@ -85,28 +83,6 @@ public class BackendTestingUtil {
     writeTmpTable(conf, new Path(parent), tableName, writeMeta);
 	}
 
-  private TajoConf conf;
-  private CatalogService catalog;
-  private SQLAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private LogicalOptimizer optimizer;
-
   public BackendTestingUtil(TajoConf conf) throws IOException {
-    this.conf = conf;
-    this.catalog = new LocalCatalogWrapper(conf);
-    analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
-  }
-
-  public static Path createTmpTestDir() throws IOException {
-    String randomStr = UUID.randomUUID().toString();
-    FileSystem fs = FileSystem.getLocal(new Configuration());
-    Path dir = new Path("target/test-data", randomStr);
-    // Have it cleaned up on exit
-    if (fs.exists(dir)) {
-      fs.delete(dir, true);
-    }
-    return dir;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 88029ea..e3b5fe7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,7 +44,6 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.io.*;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.sql.ResultSet;
@@ -189,6 +188,8 @@ public class TajoTestingCluster {
     // Do old style too just to be safe.
     this.conf.set("fs.default.name", defaultFS.getUri().toString());
 
+    this.conf.set(TajoConf.ConfVars.ROOT_DIR.name(), defaultFS.getUri() + "/tajo");
+
     return this.dfsCluster;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index fd492be..84a5c50 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -22,10 +22,6 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.apache.tajo.BackendTestingUtil;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoTestingCluster;
@@ -33,6 +29,10 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Set;
@@ -41,7 +41,7 @@ import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class TestTajoClient {
-  private static TajoTestingCluster util;
+  private static TajoTestingCluster cluster;
   private static TajoConf conf;
   private static TajoClient tajo;
   private static String TEST_PATH = "target/test-data/"
@@ -50,9 +50,9 @@ public class TestTajoClient {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    util = new TajoTestingCluster();
-    util.startMiniCluster(1);
-    conf = util.getConfiguration();
+    cluster = new TajoTestingCluster();
+    cluster.startMiniCluster(1);
+    conf = cluster.getConfiguration();
     Thread.sleep(3000);
     tajo = new TajoClient(conf);
 
@@ -61,7 +61,7 @@ public class TestTajoClient {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    util.shutdownMiniCluster();
+    cluster.shutdownMiniCluster();
     if(tajo != null) {
       tajo.close();
     }
@@ -114,7 +114,7 @@ public class TestTajoClient {
 
   @Test
   public final void testCreateAndDropExternalTableByExecuteQuery() throws IOException, ServiceException {
-    TajoConf conf = util.getConfiguration();
+    TajoConf conf = cluster.getConfiguration();
     final String tableName = "testCreateAndDropExternalTableByExecuteQuery";
 
     BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
@@ -135,7 +135,7 @@ public class TestTajoClient {
 
   @Test
   public final void testCreateAndDropTableByExecuteQuery() throws IOException, ServiceException {
-    TajoConf conf = util.getConfiguration();
+    TajoConf conf = cluster.getConfiguration();
     final String tableName = "testCreateAndDropTableByExecuteQuery";
 
     assertFalse(tajo.existTable(tableName));
@@ -145,8 +145,8 @@ public class TestTajoClient {
     tajo.updateQuery(tql);
     assertTrue(tajo.existTable(tableName));
 
-    FileSystem hdfs = FileSystem.get(conf);
     Path tablePath = tajo.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
     tajo.updateQuery("drop table " + tableName);
@@ -156,7 +156,7 @@ public class TestTajoClient {
 
   @Test
   public final void testDDLByExecuteQuery() throws IOException, ServiceException {
-    TajoConf conf = util.getConfiguration();
+    TajoConf conf = cluster.getConfiguration();
     final String tableName = "testDDLByExecuteQuery";
     BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index ede73c5..66060ce 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -66,7 +66,7 @@ public class TestGlobalQueryPlanner {
   private static LogicalPlanner logicalPlanner;
   private static LogicalOptimizer optimizer;
   private static QueryId queryId;
-  private static StorageManager sm;
+  private static AbstractStorageManager sm;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -89,7 +89,7 @@ public class TestGlobalQueryPlanner {
       catalog.registerFunction(funcDesc);
     }
 
-    sm = new StorageManager(util.getConfiguration());
+    sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
     FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -100,7 +100,7 @@ public class TestGlobalQueryPlanner {
     dispatcher.init(conf);
     dispatcher.start();
 
-    planner = new GlobalPlanner(conf, new StorageManager(conf),
+    planner = new GlobalPlanner(conf, sm,
         dispatcher.getEventHandler());
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
@@ -124,7 +124,7 @@ public class TestGlobalQueryPlanner {
         fs.delete(tablePath.getParent(), true);
       }
       fs.mkdirs(tablePath.getParent());
-      appender = StorageManager.getAppender(conf, meta, tablePath);
+      appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
       appender.init();
       tupleNum = 100;
       for (j = 0; j < tupleNum; j++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index c665b44..7572ad5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -79,7 +79,7 @@ public class TestGlobalQueryOptimizer {
 
     conf = new TajoConf(util.getConfiguration());
     catalog = util.getMiniCatalogCluster().getCatalog();
-    StorageManager sm = new StorageManager(util.getConfiguration());
+    AbstractStorageManager sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
     FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -88,7 +88,7 @@ public class TestGlobalQueryOptimizer {
 
     AsyncDispatcher dispatcher = new AsyncDispatcher();
 
-    planner = new GlobalPlanner(conf, new StorageManager(conf),
+    planner = new GlobalPlanner(conf, sm,
         dispatcher.getEventHandler());
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
@@ -112,7 +112,7 @@ public class TestGlobalQueryOptimizer {
         fs.delete(tablePath.getParent(), true);
       }
       fs.mkdirs(tablePath.getParent());
-      appender = StorageManager.getAppender(conf, meta, tablePath);
+      appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
       appender.init();
       tupleNum = 100;
       for (j = 0; j < tupleNum; j++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 97459f9..b714981 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -54,7 +54,7 @@ public class TestBNLJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
   private static int OUTER_TUPLE_NUM = 1000;
@@ -69,7 +69,7 @@ public class TestBNLJoinExec {
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestBNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -100,7 +100,7 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 1c651f6..8021882 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -59,7 +59,7 @@ public class TestBSTIndexExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Schema idxSchema;
   private TupleComparator comp;
   private BSTIndex.BSTIndexWriter writer;
@@ -82,7 +82,7 @@ public class TestBSTIndexExec {
     catalog = util.getMiniCatalogCluster().getCatalog();
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
-    sm = StorageManager.get(conf, workDir);
+    sm = StorageManagerFactory.getStorageManager(conf, workDir);
 
     idxPath = new Path(workDir, "test.idx");
 
@@ -108,7 +108,7 @@ public class TestBSTIndexExec {
     fs = tablePath.getFileSystem(conf);
     fs.mkdirs(tablePath.getParent());
 
-    FileAppender appender = (FileAppender)StorageManager.getAppender(conf, meta, tablePath);
+    FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
     for (int i = 0; i < 10000; i++) {
@@ -150,7 +150,9 @@ public class TestBSTIndexExec {
 
   @Test
   public void testEqual() throws Exception {
-    
+    if(conf.getBoolean("tajo.storage.manager.v2", false)) {
+      return;
+    }
     this.rndKey = rnd.nextInt(250);
     final String QUERY = "select * from employee where managerId = " + rndKey;
     
@@ -180,7 +182,7 @@ public class TestBSTIndexExec {
   }
 
   private class TmpPlanner extends PhysicalPlannerImpl {
-    public TmpPlanner(TajoConf conf, StorageManager sm) {
+    public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
       super(conf, sm);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 0fc3773..01fd370 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -51,7 +51,7 @@ public class TestExternalSortExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
 
@@ -67,7 +67,7 @@ public class TestExternalSortExec {
     util = new TajoTestingCluster();
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.enableStats();
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -138,6 +138,7 @@ public class TestExternalSortExec {
     int cnt = 0;
     exec.init();
     long start = System.currentTimeMillis();
+
     while ((tuple = exec.next()) != null) {
       curVal = tuple.get(0);
       if (preVal != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 8d80d9e..886dddc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashAntiJoinExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashAntiJoinExec {
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashAntiJoinExec {
     TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
         StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
 
@@ -100,7 +100,7 @@ public class TestHashAntiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index e270df3..cf89cf8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -53,7 +53,7 @@ public class TestHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -66,7 +66,7 @@ public class TestHashJoinExec {
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerId", Type.INT4);
@@ -77,7 +77,7 @@ public class TestHashJoinExec {
     TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
         StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < 10; i++) {
@@ -99,7 +99,7 @@ public class TestHashJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 317c1f2..d986a8f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashSemiJoinExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashSemiJoinExec {
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashSemiJoinExec {
     TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
         StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
 
@@ -100,7 +100,7 @@ public class TestHashSemiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     // make 27 tuples

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 776882b..e77a734 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -55,7 +55,7 @@ public class TestMergeJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
 
   private TableDesc employee;
   private TableDesc people;
@@ -68,7 +68,7 @@ public class TestMergeJoinExec {
     Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
     FileSystem fs = testDir.getFileSystem(conf);
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestMergeJoinExec {
     TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
         StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < 10; i++) {
@@ -108,7 +108,7 @@ public class TestMergeJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 9289dc9..2d82f6c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -53,7 +53,7 @@ public class TestNLJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -65,7 +65,7 @@ public class TestNLJoinExec {
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     conf = util.getConfiguration();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < 50; i++) {
@@ -99,7 +99,7 @@ public class TestNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
     appender.init();
     tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
     for (int i = 1; i < 50; i += 2) {
@@ -151,7 +151,8 @@ public class TestNLJoinExec {
 
     int i = 0;
     exec.init();
-    while (exec.next() != null) {
+    Tuple tuple = null;
+    while ( (tuple = exec.next()) != null) {
       i++;
     }
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 75e3b1e..5358d3a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -69,7 +69,7 @@ public class TestPhysicalPlanner {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static StorageManager sm;
+  private static AbstractStorageManager sm;
   private static Path testDir;
 
   private static TableDesc employee = null;
@@ -82,7 +82,7 @@ public class TestPhysicalPlanner {
     util.startCatalogCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
     catalog = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
       catalog.registerFunction(funcDesc);
@@ -107,7 +107,7 @@ public class TestPhysicalPlanner {
 
 
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < 100; i++) {
@@ -123,7 +123,7 @@ public class TestPhysicalPlanner {
 
     Path scorePath = new Path(testDir, "score");
     TableMeta scoreMeta = CatalogUtil.newTableMeta(scoreSchema, StoreType.CSV, new Options());
-    appender = StorageManager.getAppender(conf, scoreMeta, scorePath);
+    appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scorePath);
     appender.init();
     score = new TableDescImpl("score", scoreMeta, scorePath);
     tuple = new VTuple(score.getMeta().getSchema().getColumnNum());
@@ -189,7 +189,7 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     Tuple tuple;
@@ -372,7 +372,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
     int i = 0;
@@ -412,7 +412,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
     int i = 0;
@@ -787,8 +787,8 @@ public class TestPhysicalPlanner {
     reader.open();
     Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
     TableMeta meta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV, new Options());
-    SeekableScanner scanner = (SeekableScanner)
-        StorageManager.getScanner(conf, meta, outputPath);
+    SeekableScanner scanner =
+        StorageManagerFactory.getSeekableScanner(conf, meta, outputPath);
     scanner.init();
 
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 0151cb3..06c5bb7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -50,7 +50,7 @@ public class TestSortExec {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static StorageManager sm;
+  private static AbstractStorageManager sm;
   private static TajoTestingCluster util;
   private static Path workDir;
   private static Path tablePath;
@@ -64,7 +64,7 @@ public class TestSortExec {
     util = new TajoTestingCluster();
     catalog = util.startCatalogCluster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = StorageManager.get(conf, workDir);
+    sm = StorageManagerFactory.getStorageManager(conf, workDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestSortExec {
     tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
     sm.getFileSystem().mkdirs(tablePath.getParent());
 
-    Appender appender = StorageManager.getAppender(conf, employeeMeta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, tablePath);
     appender.init();
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
     for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index ba7d36b..d006679 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -22,22 +22,19 @@
 package org.apache.tajo.engine.query;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.sql.ResultSetMetaData;
@@ -49,8 +46,8 @@ import static org.junit.Assert.*;
 public class TestResultSetImpl {
   private static TajoTestingCluster util;
   private static TajoConf conf;
-  private static StorageManager sm;
   private static TableDesc desc;
+  private static AbstractStorageManager sm;
   private static TableMeta scoreMeta;
 
   @BeforeClass
@@ -58,7 +55,7 @@ public class TestResultSetImpl {
     util = new TajoTestingCluster();
     util.startMiniCluster(3);
     conf = util.getConfiguration();
-    sm = new StorageManager(conf);
+    sm = StorageManagerFactory.getStorageManager(conf);
 
     Schema scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);
@@ -68,7 +65,7 @@ public class TestResultSetImpl {
 
     Path p = sm.getTablePath("score");
     sm.getFileSystem().mkdirs(p);
-    Appender appender = StorageManager.getAppender(conf, scoreMeta, new Path(p, "score"));
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, new Path(p, "score"));
     appender.init();
     int deptSize = 100;
     int tupleNum = 10000;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index a67dd26..b5ce437 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -33,7 +33,8 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -68,7 +69,7 @@ public class TestExecutionBlockCursor {
     logicalPlanner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer();
 
-    StorageManager sm  = new StorageManager(conf);
+    AbstractStorageManager sm  = StorageManagerFactory.getStorageManager(conf);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index e44ca99..c070c4d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -19,13 +19,9 @@
 package org.apache.tajo.storage;
 
 import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -35,10 +31,14 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.util.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Set;
@@ -46,20 +46,20 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 
 public class TestRowFile {
-  private TajoTestingCluster util;
-  private Configuration conf;
+  private TajoTestingCluster cluster;
+  private TajoConf conf;
 
   @Before
   public void setup() throws Exception {
-    util = new TajoTestingCluster();
-    conf = util.getConfiguration();
+    cluster = new TajoTestingCluster();
+    conf = cluster.getConfiguration();
     conf.setInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
-    util.startMiniDFSCluster(1);
+    cluster.startMiniDFSCluster(1);
   }
 
   @After
   public void teardown() throws Exception {
-    util.shutdownMiniDFSCluster();
+    cluster.shutdownMiniDFSCluster();
   }
 
   @Test
@@ -71,15 +71,18 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.ROWFILE);
 
-    Path tablePath = new Path("hdfs:///test");
+    AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf,
+        new Path(conf.get(TajoConf.ConfVars.ROOT_DIR.name())));
+
+    Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");
     Path dataPath = new Path(tablePath, "test.tbl");
-    FileSystem fs = tablePath.getFileSystem(conf);
+    FileSystem fs = sm.getFileSystem();
     fs.mkdirs(tablePath);
 
-    FileUtil.writeProto(util.getDefaultFileSystem(), metaPath, meta.getProto());
+    FileUtil.writeProto(fs, metaPath, meta.getProto());
 
-    Appender appender = StorageManager.getAppender(conf, meta, dataPath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, dataPath);
     appender.enableStats();
     appender.init();
 
@@ -96,7 +99,6 @@ public class TestRowFile {
       tuple.put(2, stringDatum);
       appender.addTuple(tuple);
       idSet.add(i+1);
-//      System.out.println(tuple.toString());
     }
 
     long end = System.currentTimeMillis();
@@ -105,21 +107,20 @@ public class TestRowFile {
     TableStat stat = appender.getStats();
     assertEquals(tupleNum, stat.getNumRows().longValue());
 
-    System.out.println("append time: " + (end-start));
+    System.out.println("append time: " + (end - start));
 
     FileStatus file = fs.getFileStatus(dataPath);
     TableProto proto = (TableProto) FileUtil.loadProto(
-        util.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
+        cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
     meta = new TableMetaImpl(proto);
     Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen());
 
     int tupleCnt = 0;
     start = System.currentTimeMillis();
-    Scanner scanner = StorageManager.getScanner(conf, meta, fragment);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
     scanner.init();
     while ((tuple=scanner.next()) != null) {
       tupleCnt++;
-//      System.out.println(tuple.toString());
     }
     scanner.close();
     end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index cf1e9ae..4f2795b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -64,7 +64,7 @@ public class TestRangeRetrieverHandler {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private StorageManager sm;
+  private AbstractStorageManager sm;
   private Schema schema;
   private static int TEST_TUPLE = 10000;
   private FileSystem fs;
@@ -78,7 +78,7 @@ public class TestRangeRetrieverHandler {
     fs = testDir.getFileSystem(conf);
     util.startCatalogCluster();
     catalog = util.getMiniCatalogCluster().getCatalog();
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
@@ -108,7 +108,7 @@ public class TestRangeRetrieverHandler {
 
     Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
     fs.mkdirs(tableDir.getParent());
-    Appender appender = sm.getAppender(conf, employeeMeta, tableDir);
+    Appender appender = sm.getAppender(employeeMeta, tableDir);
     appender.init();
 
     Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -161,8 +161,10 @@ public class TestRangeRetrieverHandler {
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
-    SeekableScanner scanner = (SeekableScanner)
-        sm.getScanner(conf, employeeMeta, StorageUtil.concatPath(testDir, "output", "output"));
+
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta,
+        StorageUtil.concatPath(testDir, "output", "output"));
+
     scanner.init();
     int cnt = 0;
     while(scanner.next() != null) {
@@ -220,7 +222,7 @@ public class TestRangeRetrieverHandler {
     TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = sm.getAppender(conf, meta, tablePath);
+    Appender appender = sm.getAppender(meta, tablePath);
     appender.init();
     Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
     for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
@@ -271,8 +273,8 @@ public class TestRangeRetrieverHandler {
     BSTIndex.BSTIndexReader reader = bst.getIndexReader(
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
-    SeekableScanner scanner = (SeekableScanner) StorageManager.getScanner(
-        conf, meta, StorageUtil.concatPath(testDir, "output", "output"));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta,
+        StorageUtil.concatPath(testDir, "output", "output"));
     scanner.init();
     int cnt = 0;
     while(scanner.next() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
new file mode 100644
index 0000000..2b59ecb
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.TableMetaImpl;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractStorageManager {
+  private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
+
+  protected final TajoConf conf;
+  protected final FileSystem fs;
+  protected final Path baseDir;
+  protected final Path tableBaseDir;
+  protected final boolean blocksMetadataEnabled;
+
+  /**
+   * Cache of scanner handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+  /**
+   * Cache of appender handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
+
+  /**
+   * Cache of constructors for each class. Pins the classes so they
+   * can't be garbage collected until ReflectionUtils can be collected.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  public abstract Scanner getScanner(TableMeta meta, Fragment fragment,
+                                   Schema target) throws IOException;
+
+  protected AbstractStorageManager(TajoConf conf) throws IOException {
+    this.conf = conf;
+    this.baseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR));
+    this.tableBaseDir = TajoConf.getWarehousePath(conf);
+    this.fs = baseDir.getFileSystem(conf);
+    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    if (!this.blocksMetadataEnabled)
+      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+  }
+
+  public Scanner getScanner(TableMeta meta, Path path)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+    return getScanner(meta, fragment);
+  }
+
+  public Scanner getScanner(TableMeta meta, Fragment fragment)
+      throws IOException {
+    return getScanner(meta, fragment, meta.getSchema());
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public Path getBaseDir() {
+    return this.baseDir;
+  }
+
+  public Path getTableBaseDir() {
+    return this.tableBaseDir;
+  }
+
+  public void delete(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    fs.delete(tablePath, true);
+  }
+
+  public boolean exists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    return fileSystem.exists(path);
+  }
+
+  /**
+   * This method deletes only data contained in the given path.
+   *
+   * @param path The path in which data are deleted.
+   * @throws IOException
+   */
+  public void deleteData(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] fileLists = fileSystem.listStatus(path);
+    for (FileStatus status : fileLists) {
+      fileSystem.delete(status.getPath(), true);
+    }
+  }
+
+  public Path getTablePath(String tableName) {
+    return new Path(tableBaseDir, tableName);
+  }
+
+  public Appender getAppender(TableMeta meta, Path path)
+      throws IOException {
+    Appender appender;
+
+    Class<? extends FileAppender> appenderClass;
+
+    String handlerName = meta.getStoreType().name().toLowerCase();
+    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+    if (appenderClass == null) {
+      appenderClass = conf.getClass(
+          String.format("tajo.storage.appender-handler.%s.class",
+              meta.getStoreType().name().toLowerCase()), null,
+          FileAppender.class);
+      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+    }
+
+    if (appenderClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    appender = newAppenderInstance(appenderClass, conf, meta, path);
+
+    return appender;
+  }
+
+
+  public TableMeta getTableMeta(Path tablePath) throws IOException {
+    TableMeta meta;
+
+    FileSystem fs = tablePath.getFileSystem(conf);
+    Path tableMetaPath = new Path(tablePath, ".meta");
+    if (!fs.exists(tableMetaPath)) {
+      throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+    }
+
+    FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+    CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
+        CatalogProtos.TableProto.getDefaultInstance());
+    meta = new TableMetaImpl(tableProto);
+
+    return meta;
+  }
+
+  public Fragment[] split(String tableName) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  public Fragment[] split(String tableName, long fragmentSize) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fragmentSize);
+  }
+
+  public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    TableMeta meta = getTableMeta(tablePath);
+    List<Fragment> listTablets = new ArrayList<Fragment>();
+    Fragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
+      listTablets.add(tablet);
+    }
+
+    Fragment[] tablets = new Fragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public Fragment[] split(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+  }
+
+  public Fragment[] split(String tableName, Path tablePath) throws IOException {
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  private Fragment[] split(String tableName, Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    TableMeta meta = getTableMeta(tablePath);
+    long defaultBlockSize = size;
+    List<Fragment> listTablets = new ArrayList<Fragment>();
+    Fragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+      } else {
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+      }
+    }
+
+    Fragment[] tablets = new Fragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+                                   Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<Fragment> listTablets = new ArrayList<Fragment>();
+    Fragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+      } else {
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+      }
+    }
+
+    Fragment[] tablets = new Fragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public void writeTableMeta(Path tableRoot, TableMeta meta)
+      throws IOException {
+    FileSystem fs = tableRoot.getFileSystem(conf);
+    FSDataOutputStream out = fs.create(new Path(tableRoot, ".meta"));
+    FileUtil.writeProto(out, meta.getProto());
+    out.flush();
+    out.close();
+  }
+
+  public long calculateSize(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    long totalSize = 0;
+
+    if (fs.exists(tablePath)) {
+      for (FileStatus status : fs.listStatus(tablePath)) {
+        totalSize += status.getLen();
+      }
+    }
+
+    return totalSize;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // FileInputFormat Area
+  /////////////////////////////////////////////////////////////////////////////
+
+  private static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression.
+   *
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Path path) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    Path[] dirs = new Path[]{path};
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (int i = 0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+
+      FileSystem fs = p.getFileSystem(conf);
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDirectory()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * Get the lower bound on split size imposed by the format.
+   *
+   * @return the number of bytes of the minimal split for this format
+   */
+  protected long getFormatMinSplitSize() {
+    return 1;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * <p/>
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that Mappers process entire files.
+   *
+   * @param filename the file name to check
+   * @return is this file isSplittable?
+   */
+  protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
+    Scanner scanner = getScanner(meta, filename);
+    return scanner.isSplittable();
+  }
+
+  @Deprecated
+  protected long computeSplitSize(long blockSize, long minSize,
+                                  long maxSize) {
+    return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  @Deprecated
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  @Deprecated
+  protected int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+    return new Fragment(fragmentId, file, meta, start, length);
+  }
+
+  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+                               int[] diskIds) throws IOException {
+    return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+                                  BlockLocation[] blkLocations) throws IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+    int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+      hostsBlockCount[i] = entry.getValue();
+    }
+    return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
+  }
+
+  /**
+   * Get the maximum split size.
+   *
+   * @return the maximum number of bytes a split can include
+   */
+  @Deprecated
+  public static long getMaxSplitSize() {
+    // TODO - to be configurable
+    return 536870912L;
+  }
+
+  /**
+   * Get the minimum split size
+   *
+   * @return the minimum number of bytes that can be in a split
+   */
+  @Deprecated
+  public static long getMinSplitSize() {
+    // TODO - to be configurable
+    return 67108864L;
+  }
+
+  /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].isValid()) {
+        String volumeIdString = volumeIds[i].toString();
+        byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+        if (volumeIdBytes.length == 4) {
+          diskId = Bytes.toInt(volumeIdBytes);
+        } else if (volumeIdBytes.length == 1) {
+          diskId = (int) volumeIdBytes[0];  // support hadoop-2.0.2
+        }
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the map of host and make them into Volume Ids.
+   *
+   */
+  private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+    for (Fragment frag : frags) {
+      String[] hosts = frag.getHosts();
+      int[] diskIds = frag.getDiskIds();
+      for (int i = 0; i < hosts.length; i++) {
+        Set<Integer> volumeList = volumeMap.get(hosts[i]);
+        if (volumeList == null) {
+          volumeList = new HashSet<Integer>();
+          volumeMap.put(hosts[i], volumeList);
+        }
+
+        if (diskIds.length > 0 && diskIds[i] > -1) {
+          volumeList.add(diskIds[i]);
+        }
+      }
+    }
+
+    return volumeMap;
+  }
+  /**
+   * Generate the list of files and make them into FileSplits.
+   *
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
+    // generate splits'
+
+    List<Fragment> splits = new ArrayList<Fragment>();
+    List<FileStatus> files = listStatus(inputPath);
+    FileSystem fs = inputPath.getFileSystem(conf);
+    for (FileStatus file : files) {
+      Path path = file.getPath();
+      long length = file.getLen();
+      if (length > 0) {
+        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+        boolean splittable = isSplittable(meta, path);
+        if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+          // supported disk volume
+          BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+              .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+          if (splittable) {
+            for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+              splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+                  .getVolumeIds())));
+            }
+          } else { // Non splittable
+            splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
+          }
+
+        } else {
+          if (splittable) {
+            for (BlockLocation blockLocation : blkLocations) {
+              splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+            }
+          } else { // Non splittable
+            splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
+          }
+        }
+      } else {
+        //for zero length files
+        splits.add(makeSplit(tableName, meta, path, 0, length));
+      }
+    }
+
+    LOG.info("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  private class InvalidInputException extends IOException {
+    public InvalidInputException(
+        List<IOException> errors) {
+    }
+  }
+
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      TableMeta.class,
+      Fragment.class
+  };
+
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      TableMeta.class,
+      Path.class
+  };
+
+  /**
+   * create a scanner instance.
+   */
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+                                         Fragment fragment) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, meta, fragment});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * create a scanner instance.
+   */
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+                                          Path path) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, meta, path});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
index 4f6dde1..6c31247 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
@@ -23,15 +23,17 @@ import com.google.gson.Gson;
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.storage.json.StorageGsonHelper;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject, GsonObject {
   protected FragmentProto.Builder builder = null;
@@ -44,8 +46,8 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
   @Expose private boolean distCached = false; // optional
 
   private String[] hosts; // Datanode hostnames
-  private int[] hostsBlockCount; // list of block count of hosts
-  private int[] diskIds;
+  @Expose private int[] hostsBlockCount; // list of block count of hosts
+  @Expose private int[] diskIds;
 
   public Fragment() {
     builder = FragmentProto.newBuilder();
@@ -53,13 +55,13 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
 
   public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException {
     this();
-    TableMeta newMeta = new TableMetaImpl(meta.getProto());
+    //TableMeta newMeta = new TableMetaImpl(meta.getProto());
+    TableMeta newMeta = meta;
     SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
         .getSchema().getProto());
     newMeta.setSchema(new Schema(newSchemaProto));
-    this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength());
-    this.hosts = blockLocation.getHosts();
-    this.diskIds = diskIds;
+    this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength(),
+        blockLocation.getHosts(), diskIds);
   }
 
   // Non splittable
@@ -69,7 +71,7 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
         .getSchema().getProto());
     newMeta.setSchema(new Schema(newSchemaProto));
-    this.set(tableName, uri, newMeta, start, length);
+    this.set(tableName, uri, newMeta, start, length, null, null);
     this.hosts = hosts;
     this.hostsBlockCount = hostsBlockCount;
   }
@@ -80,26 +82,35 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta
         .getSchema().getProto());
     newMeta.setSchema(new Schema(newSchemaProto));
-    this.set(fragmentId, path, newMeta, start, length);
+    this.set(fragmentId, path, newMeta, start, length, null, null);
   }
 
   public Fragment(FragmentProto proto) {
     this();
     TableMeta newMeta = new TableMetaImpl(proto.getMeta());
+    int[] diskIds = new int[proto.getDiskIdsList().size()];
+    int i = 0;
+    for(Integer eachValue: proto.getDiskIdsList()) {
+      diskIds[i++] = eachValue;
+    }
     this.set(proto.getId(), new Path(proto.getPath()), newMeta,
-        proto.getStartOffset(), proto.getLength());
+        proto.getStartOffset(), proto.getLength(),
+        proto.getHostsList().toArray(new String[]{}),
+        diskIds);
     if (proto.hasDistCached() && proto.getDistCached()) {
       distCached = true;
     }
   }
 
   private void set(String tableName, Path path, TableMeta meta, long start,
-      long length) {
+      long length, String[] hosts, int[] diskIds) {
     this.tableName = tableName;
     this.uri = path;
     this.meta = meta;
     this.startOffset = start;
     this.length = length;
+    this.hosts = hosts;
+    this.diskIds = diskIds;
   }
 
 
@@ -234,6 +245,9 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     frag.uri = uri;
     frag.meta = (TableMeta) (meta != null ? meta.clone() : null);
     frag.distCached = distCached;
+    frag.diskIds = diskIds;
+    frag.hosts = hosts;
+    frag.hostsBlockCount = hostsBlockCount;
     
     return frag;
   }
@@ -256,6 +270,17 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     builder.setLength(this.length);
     builder.setPath(this.uri.toString());
     builder.setDistCached(this.distCached);
+    if(diskIds != null) {
+      List<Integer> idList = new ArrayList<Integer>();
+      for(int eachId: diskIds) {
+        idList.add(eachId);
+      }
+      builder.addAllDiskIds(idList);
+    }
+
+    if(hosts != null) {
+      builder.addAllHosts(TUtil.newList(hosts));
+    }
 
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index a7a1e4a..582c64f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -61,7 +62,7 @@ public class MergeScanner implements Scanner {
         currentScanner.close();
       }
       currentFragment = iterator.next();
-      currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+      currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
       currentScanner.init();
       return currentScanner.next();
     } else {
@@ -74,7 +75,7 @@ public class MergeScanner implements Scanner {
     iterator = fragments.iterator();
     if (iterator.hasNext()) {
       currentFragment = iterator.next();
-      currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+      currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
     }
   }