You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 07:00:05 UTC

[31/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
index e4b98d4..406550d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 
 import java.util.*;
@@ -81,9 +82,12 @@ public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorit
   @Override
   public void addFragment(FragmentPair fragmentPair) {
     String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    int[] diskIds = null;
+    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+    }
     for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds[i], fragmentPair);
+      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
     }
     fragmentNum++;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 77e3257..01137aa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -44,6 +44,7 @@ import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -150,8 +151,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     super.stop();
   }
 
-  private FileFragment[] fragmentsForNonLeafTask;
-  private FileFragment[] broadcastFragmentsForNonLeafTask;
+  private Fragment[] fragmentsForNonLeafTask;
+  private Fragment[] broadcastFragmentsForNonLeafTask;
 
   LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
   public void schedule() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
index 598b1c5..827386b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master;
 
 import com.google.common.base.Objects;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 /**
  * FragmentPair consists of two fragments, a left fragment and a right fragment.
@@ -29,23 +29,23 @@ import org.apache.tajo.storage.fragment.FileFragment;
  * For other queries, it is assumed to have only a left fragment.
  */
 public class FragmentPair {
-  private FileFragment leftFragment;
-  private FileFragment rightFragment;
+  private Fragment leftFragment;
+  private Fragment rightFragment;
 
-  public FragmentPair(FileFragment left) {
+  public FragmentPair(Fragment left) {
     this.leftFragment = left;
   }
 
-  public FragmentPair(FileFragment left, FileFragment right) {
+  public FragmentPair(Fragment left, Fragment right) {
     this.leftFragment = left;
     this.rightFragment = right;
   }
 
-  public FileFragment getLeftFragment() {
+  public Fragment getLeftFragment() {
     return leftFragment;
   }
 
-  public FileFragment getRightFragment() {
+  public Fragment getRightFragment() {
     return rightFragment;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 399644c..a9624f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -40,6 +40,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
@@ -336,6 +337,15 @@ public class GlobalEngine extends AbstractService {
         responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
       }
     } else { // it requires distributed execution. So, the query is forwarded to a query master.
+      StoreType storeType = PlannerUtil.getStoreType(plan);
+      if (storeType != null) {
+        StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
+        StorageProperty storageProperty = sm.getStorageProperty();
+        if (!storageProperty.isSupportsInsertInto()) {
+          throw new VerifyException("Inserting into non-file storage is not supported.");
+        }
+        sm.beforeInsertOrCATS(rootNode.getChild());
+      }
       context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
       hookManager.doHooks(queryContext, plan);
 
@@ -348,6 +358,7 @@ public class GlobalEngine extends AbstractService {
         responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
         responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
         responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+        LOG.error("Fail starting QueryMaster: " + sql);
       } else {
         responseBuilder.setIsForwarded(true);
         responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
@@ -356,7 +367,8 @@ public class GlobalEngine extends AbstractService {
           responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
         }
         responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
-        LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+        LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
+            " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
       }
     }
 
@@ -556,6 +568,7 @@ public class GlobalEngine extends AbstractService {
     LOG.info("=============================================");
 
     annotatedPlanVerifier.verify(queryContext, state, plan);
+    verifyInsertTableSchema(queryContext, state, plan);
 
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
@@ -568,6 +581,25 @@ public class GlobalEngine extends AbstractService {
     return plan;
   }
 
+  private void verifyInsertTableSchema(QueryContext queryContext, VerificationState state, LogicalPlan plan) {
+    StoreType storeType = PlannerUtil.getStoreType(plan);
+    if (storeType != null) {
+      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+      if (rootNode.getChild().getType() == NodeType.INSERT) {
+        try {
+          TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+          InsertNode iNode = rootNode.getChild();
+          Schema outSchema = iNode.getChild().getOutSchema();
+
+          StorageManager.getStorageManager(queryContext.getConf(), storeType)
+              .verifyInsertTableSchema(tableDesc, outSchema);
+        } catch (Throwable t) {
+          state.addVerification(t.getMessage());
+        }
+      }
+    }
+  }
+
   /**
    * Alter a given table
    */
@@ -730,32 +762,18 @@ public class GlobalEngine extends AbstractService {
       meta = CatalogUtil.newTableMeta(createTable.getStorageType());
     }
 
-    if(createTable.isExternal()){
+    if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
       Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
-    } else {
-      String databaseName;
-      String tableName;
-      if (CatalogUtil.isFQTableName(createTable.getTableName())) {
-        databaseName = CatalogUtil.extractQualifier(createTable.getTableName());
-        tableName = CatalogUtil.extractSimpleName(createTable.getTableName());
-      } else {
-        databaseName = queryContext.getCurrentDatabase();
-        tableName = createTable.getTableName();
-      }
-
-      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName);
-      createTable.setPath(tablePath);
     }
 
-    return createTableOnPath(queryContext, createTable.getTableName(), createTable.getTableSchema(),
-        meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
+    return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
+        createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
+        createTable.getPartitionMethod(), ifNotExists);
   }
 
-  public TableDesc createTableOnPath(QueryContext queryContext, String tableName, Schema schema, TableMeta meta,
-                                     Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
-                                     boolean ifNotExists)
-      throws IOException {
+  public TableDesc createTable(QueryContext queryContext, String tableName, StoreType storeType,
+                               Schema schema, TableMeta meta, Path path, boolean isExternal,
+                               PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
     String databaseName;
     String simpleTableName;
     if (CatalogUtil.isFQTableName(tableName)) {
@@ -779,39 +797,15 @@ public class GlobalEngine extends AbstractService {
       }
     }
 
-    FileSystem fs = path.getFileSystem(context.getConf());
-
-    if (isExternal) {
-      if(!fs.exists(path)) {
-        LOG.error("ERROR: " + path.toUri() + " does not exist");
-        throw new IOException("ERROR: " + path.toUri() + " does not exist");
-      }
-    } else {
-      fs.mkdirs(path);
-    }
-
-    long totalSize = 0;
-
-    try {
-      totalSize = sm.calculateSize(path);
-    } catch (IOException e) {
-      LOG.warn("Cannot calculate the size of the relation", e);
-    }
-
-    TableStats stats = new TableStats();
-    stats.setNumBytes(totalSize);
-
-    if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing.
-      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
-    }
-
     TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
-        schema, meta, path.toUri(), isExternal);
-    desc.setStats(stats);
+        schema, meta, (path != null ? path.toUri(): null), isExternal);
+    
     if (partitionDesc != null) {
       desc.setPartitionMethod(partitionDesc);
     }
 
+    StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+
     if (catalog.createTable(desc)) {
       LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
       return desc;
@@ -905,13 +899,13 @@ public class GlobalEngine extends AbstractService {
       }
     }
 
-    Path path = new Path(catalog.getTableDesc(qualifiedName).getPath());
+    TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
     catalog.dropTable(qualifiedName);
 
     if (purge) {
       try {
-        FileSystem fs = path.getFileSystem(context.getConf());
-        fs.delete(path, true);
+        StorageManager.getStorageManager(queryContext.getConf(),
+            tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
       } catch (IOException e) {
         throw new InternalError(e.getMessage());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
index 82fd6fc..56cf8e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TUtil;
 
@@ -101,9 +102,12 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
   @Override
   public void addFragment(FragmentPair fragmentPair) {
     String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    int[] diskIds = null;
+    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+    }
     for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds[i], fragmentPair);
+      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
     }
     totalFragmentNum++;
   }
@@ -276,23 +280,27 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
 
   public void removeFragment(FragmentPair fragmentPair) {
     String [] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    int[] diskIds = null;
+    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+    }
     for (int i = 0; i < hosts.length; i++) {
+      int diskId = diskIds == null ? -1 : diskIds[i];
       String normalizedHost = NetUtils.normalizeHost(hosts[i]);
       Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
 
       if (diskFragmentMap != null) {
-        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
         if (fragmentsPerDisk != null) {
           boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
           if (isRemoved) {
             if (fragmentsPerDisk.size() == 0) {
-              diskFragmentMap.remove(diskIds[i]);
+              diskFragmentMap.remove(diskId);
               if (diskFragmentMap.size() == 0) {
                 fragmentHostMapping.remove(normalizedHost);
               }
             }
-            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
             if (totalHostPriority.containsKey(hostAndDisk)) {
               PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
               updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index b2883cc..cc99453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
@@ -38,7 +39,9 @@ import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -197,15 +200,17 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
     if (event.getType() == EventType.T_SCHEDULE) {
       if (event instanceof FragmentScheduleEvent) {
         FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
-        Collection<FileFragment> rightFragments = castEvent.getRightFragments();
+        Collection<Fragment> rightFragments = castEvent.getRightFragments();
         if (rightFragments == null || rightFragments.isEmpty()) {
           scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
         } else {
-          for (FileFragment eachFragment: rightFragments) {
+          for (Fragment eachFragment: rightFragments) {
             scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
           }
         }
-        initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds());
+        if (castEvent.getLeftFragment() instanceof FileFragment) {
+          initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds());
+        }
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
         scheduledFetches.addFetch(castEvent.getFetches());
@@ -366,6 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       long taskSize = adjustTaskSize();
       LOG.info("Adjusted task size: " + taskSize);
 
+      TajoConf conf = subQuery.getContext().getConf();
       // host local, disk local
       String normalized = NetUtils.normalizeHost(host);
       Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
@@ -376,13 +382,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
             break;
           }
 
-          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+          if (assignedFragmentSize +
+              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
             break;
           } else {
             fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
             if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
             }
           }
           scheduledFragments.removeFragment(fragmentPair);
@@ -398,13 +405,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
             break;
           }
 
-          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+          if (assignedFragmentSize +
+              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
             break;
           } else {
             fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
             if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
             }
           }
           scheduledFragments.removeFragment(fragmentPair);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index 768528d..64081f3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -26,7 +26,6 @@ import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.query.QueryContext;
@@ -34,6 +33,8 @@ import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -41,7 +42,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class NonForwardQueryResultScanner {
-  private static final int MAX_FILE_NUM_PER_SCAN = 100;
+  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
 
   private QueryId queryId;
   private String sessionId;
@@ -54,7 +55,7 @@ public class NonForwardQueryResultScanner {
   private TajoConf tajoConf;
   private ScanNode scanNode;
 
-  private int currentFileIndex = 0;
+  private int currentFragmentIndex = 0;
 
   public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
                                       QueryId queryId,
@@ -76,23 +77,24 @@ public class NonForwardQueryResultScanner {
   }
 
   private void initSeqScanExec() throws IOException {
-    FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
-        currentFileIndex, MAX_FILE_NUM_PER_SCAN);
-    if (fragments != null && fragments.length > 0) {
+    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+
+    if (fragments != null && !fragments.isEmpty()) {
+      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
       this.taskContext = new TaskAttemptContext(
           new QueryContext(tajoConf), null,
           new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
-          fragments, null);
+          fragmentProtos, null);
 
       try {
         // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
-        scanExec = new SeqScanExec(taskContext,
-            StorageManager.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
+        scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos);
       } catch (CloneNotSupportedException e) {
         throw new IOException(e.getMessage(), e);
       }
       scanExec.init();
-      currentFileIndex += fragments.length;
+      currentFragmentIndex += fragments.size();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 604cfe0..f307127 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -53,6 +53,7 @@ import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.*;
 import org.apache.tajo.util.history.HistoryReader;
@@ -110,7 +111,7 @@ public class TajoMaster extends CompositeService {
 
   private CatalogServer catalogServer;
   private CatalogService catalog;
-  private StorageManager storeManager;
+  private FileStorageManager storeManager;
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
@@ -171,7 +172,7 @@ public class TajoMaster extends CompositeService {
       // check the system directory and create if they are not created.
       checkAndInitializeSystemDirectories();
       diagnoseTajoMaster();
-      this.storeManager = StorageManager.getStorageManager(systemConf);
+      this.storeManager = (FileStorageManager)StorageManager.getFileStorageManager(systemConf, null);
 
       catalogServer = new CatalogServer(FunctionLoader.load());
       addIfService(catalogServer);
@@ -422,7 +423,7 @@ public class TajoMaster extends CompositeService {
     return this.catalogServer;
   }
 
-  public StorageManager getStorageManager() {
+  public FileStorageManager getStorageManager() {
     return this.storeManager;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index b420a65..7014034 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -814,7 +814,8 @@ public class TajoMasterClientService extends AbstractService {
 
         TableDesc desc;
         try {
-          desc = context.getGlobalEngine().createTableOnPath(queryContext, request.getName(), schema,
+          desc = context.getGlobalEngine().createTable(queryContext, request.getName(),
+              meta.getStoreType(), schema,
               meta, path, true, partitionDesc, false);
         } catch (Exception e) {
           return TableResponse.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
index 8cc17cb..9a7cc76 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
@@ -19,23 +19,23 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.util.Collection;
 
 public class FragmentScheduleEvent extends TaskSchedulerEvent {
-  private final FileFragment leftFragment;
-  private final Collection<FileFragment> rightFragments;
+  private final Fragment leftFragment;
+  private final Collection<Fragment> rightFragments;
 
   public FragmentScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
-                               final FileFragment fragment) {
+                               final Fragment fragment) {
     this(eventType, blockId, fragment, null);
   }
 
   public FragmentScheduleEvent(final EventType eventType,
                                final ExecutionBlockId blockId,
-                               final FileFragment leftFragment,
-                               final Collection<FileFragment> rightFragments) {
+                               final Fragment leftFragment,
+                               final Collection<Fragment> rightFragments) {
     super(eventType, blockId);
     this.leftFragment = leftFragment;
     this.rightFragments = rightFragments;
@@ -45,11 +45,11 @@ public class FragmentScheduleEvent extends TaskSchedulerEvent {
     return this.rightFragments != null && !this.rightFragments.isEmpty();
   }
 
-  public FileFragment getLeftFragment() {
+  public Fragment getLeftFragment() {
     return leftFragment;
   }
 
-  public Collection<FileFragment> getRightFragments() { return rightFragments; }
+  public Collection<Fragment> getRightFragments() { return rightFragments; }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index f92001f..a048780 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -32,33 +31,28 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.CreateTableNode;
-import org.apache.tajo.plan.logical.InsertNode;
-import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.util.history.SubQueryHistory;
 
 import java.io.IOException;
-import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -74,7 +68,6 @@ public class Query implements EventHandler<QueryEvent> {
   private Map<ExecutionBlockId, SubQuery> subqueries;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
-  private final StorageManager sm;
   QueryMasterTask.QueryMasterTaskContext context;
   private ExecutionBlockCursor cursor;
 
@@ -216,7 +209,6 @@ public class Query implements EventHandler<QueryEvent> {
     subqueries = Maps.newHashMap();
     this.eventHandler = eventHandler;
     this.plan = plan;
-    this.sm = context.getStorageManager();
     this.cursor = new ExecutionBlockCursor(plan, true);
 
     StringBuilder sb = new StringBuilder("\n=======================================================");
@@ -398,7 +390,7 @@ public class Query implements EventHandler<QueryEvent> {
 
       query.setStartTime();
       SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
-          query.getExecutionBlockCursor().nextBlock(), query.sm);
+          query.getExecutionBlockCursor().nextBlock());
       subQuery.setPriority(query.priority--);
       query.addSubQuery(subQuery);
 
@@ -423,6 +415,20 @@ public class Query implements EventHandler<QueryEvent> {
       } else {
         finalState = QueryState.QUERY_ERROR;
       }
+      if (finalState != QueryState.QUERY_SUCCEEDED) {
+        SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId());
+        if (lastStage != null && lastStage.getTableMeta() != null) {
+          StoreType storeType = lastStage.getTableMeta().getStoreType();
+          if (storeType != null) {
+            LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+            try {
+              StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+            } catch (IOException e) {
+              LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+            }
+          }
+        }
+      }
       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
       query.setFinishTime();
 
@@ -430,356 +436,27 @@ public class Query implements EventHandler<QueryEvent> {
     }
 
     private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
-      MasterPlan masterPlan = query.getPlan();
+      SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId());
+      StoreType storeType = lastStage.getTableMeta().getStoreType();
+      try {
+        LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+        CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+        TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
 
-      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
-      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+        Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+            .commitOutputData(query.context.getQueryContext(),
+                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
 
-      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-      try {
-        Path finalOutputDir = commitOutputData(query);
+        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
-      } catch (Throwable t) {
-        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t)));
+      } catch (Exception e) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
         return QueryState.QUERY_ERROR;
       }
 
       return QueryState.QUERY_SUCCEEDED;
     }
 
-    /**
-     * It moves a result data stored in a staging output dir into a final output dir.
-     */
-    public Path commitOutputData(Query query) throws IOException {
-      QueryContext queryContext = query.context.getQueryContext();
-      Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
-      Path finalOutputDir;
-      if (queryContext.hasOutputPath()) {
-        finalOutputDir = queryContext.getOutputPath();
-        try {
-          FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
-
-          if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
-
-            // It moves the original table into the temporary location.
-            // Then it moves the new result table into the original table location.
-            // Upon failed, it recovers the original table if possible.
-            boolean movedToOldTable = false;
-            boolean committed = false;
-            Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-            ContentSummary summary = fs.getContentSummary(stagingResultDir);
-
-            if (queryContext.hasPartition() && summary.getFileCount() > 0L) {
-              // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
-              // renaming directory.
-              Map<Path, Path> renameDirs = TUtil.newHashMap();
-              // This is a map for recovering existing partition directory. A key is current directory and a value is
-              // temporary directory to back up.
-              Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
-              try {
-                if (!fs.exists(finalOutputDir)) {
-                  fs.mkdirs(finalOutputDir);
-                }
-
-                visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
-                    renameDirs, oldTableDir);
-
-                // Rename target partition directories
-                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                  // Backup existing data files for recovering
-                  if (fs.exists(entry.getValue())) {
-                    String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
-                        oldTableDir.toString());
-                    Path recoveryPath = new Path(recoveryPathString);
-                    fs.rename(entry.getValue(), recoveryPath);
-                    fs.exists(recoveryPath);
-                    recoveryDirs.put(entry.getValue(), recoveryPath);
-                  }
-                  // Delete existing directory
-                  fs.delete(entry.getValue(), true);
-                  // Rename staging directory to final output directory
-                  fs.rename(entry.getKey(), entry.getValue());
-                }
-
-              } catch (IOException ioe) {
-                // Remove created dirs
-                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                  fs.delete(entry.getValue(), true);
-                }
-
-                // Recovery renamed dirs
-                for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
-                  fs.delete(entry.getValue(), true);
-                  fs.rename(entry.getValue(), entry.getKey());
-                }
-
-                throw new IOException(ioe.getMessage());
-              }
-            } else { // no partition
-              try {
-
-                // if the final output dir exists, move all contents to the temporary table dir.
-                // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
-                if (fs.exists(finalOutputDir)) {
-                  fs.mkdirs(oldTableDir);
-
-                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
-                    fs.rename(status.getPath(), oldTableDir);
-                  }
-
-                  movedToOldTable = fs.exists(oldTableDir);
-                } else { // if the parent does not exist, make its parent directory.
-                  fs.mkdirs(finalOutputDir);
-                }
-
-                // Move the results to the final output dir.
-                for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                  fs.rename(status.getPath(), finalOutputDir);
-                }
-
-                // Check the final output dir
-                committed = fs.exists(finalOutputDir);
-
-              } catch (IOException ioe) {
-                // recover the old table
-                if (movedToOldTable && !committed) {
-
-                  // if commit is failed, recover the old data
-                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
-                    fs.delete(status.getPath(), true);
-                  }
-
-                  for (FileStatus status : fs.listStatus(oldTableDir)) {
-                    fs.rename(status.getPath(), finalOutputDir);
-                  }
-                }
-
-                throw new IOException(ioe.getMessage());
-              }
-            }
-          } else {
-            NodeType queryType = queryContext.getCommandType();
-
-            if (queryType == NodeType.INSERT) { // INSERT INTO an existing table
-
-              NumberFormat fmt = NumberFormat.getInstance();
-              fmt.setGroupingUsed(false);
-              fmt.setMinimumIntegerDigits(3);
-
-              if (queryContext.hasPartition()) {
-                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                  if (eachFile.isFile()) {
-                    LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
-                    continue;
-                  }
-                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1);
-                }
-              } else {
-                int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
-                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++);
-                }
-              }
-              // checking all file moved and remove empty dir
-              verifyAllFileMoved(fs, stagingResultDir);
-              FileStatus[] files = fs.listStatus(stagingResultDir);
-              if (files != null && files.length != 0) {
-                for (FileStatus eachFile: files) {
-                  LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-                }
-              }
-            } else { // CREATE TABLE AS SELECT (CTAS)
-              if (fs.exists(finalOutputDir)) {
-                for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                  fs.rename(status.getPath(), finalOutputDir);
-                }
-              } else {
-                fs.rename(stagingResultDir, finalOutputDir);
-              }
-              LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
-            }
-          }
-
-          // remove the staging directory if the final output dir is given.
-          Path stagingDirRoot = queryContext.getStagingDir().getParent();
-          fs.delete(stagingDirRoot, true);
-
-        } catch (Throwable t) {
-          LOG.error(t);
-          throw new IOException(t);
-        }
-      } else {
-        finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
-      }
-
-      return finalOutputDir;
-    }
-
-    /**
-     * This method sets a rename map which includes renamed staging directory to final output directory recursively.
-     * If there exists some data files, this delete it for duplicate data.
-     *
-     *
-     * @param fs
-     * @param stagingPath
-     * @param outputPath
-     * @param stagingParentPathString
-     * @throws IOException
-     */
-    private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
-                                        String stagingParentPathString,
-                                        Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
-      FileStatus[] files = fs.listStatus(stagingPath);
-
-      for(FileStatus eachFile : files) {
-        if (eachFile.isDirectory()) {
-          Path oldPath = eachFile.getPath();
-
-          // Make recover directory.
-          String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
-          oldTableDir.toString());
-          Path recoveryPath = new Path(recoverPathString);
-          if (!fs.exists(recoveryPath)) {
-            fs.mkdirs(recoveryPath);
-          }
-
-          visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
-          renameDirs, oldTableDir);
-          // Find last order partition for renaming
-          String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
-          outputPath.toString());
-          Path newPath = new Path(newPathString);
-          if (!isLeafDirectory(fs, eachFile.getPath())) {
-           renameDirs.put(eachFile.getPath(), newPath);
-          } else {
-            if (!fs.exists(newPath)) {
-             fs.mkdirs(newPath);
-            }
-          }
-        }
-      }
-    }
-
-    private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
-      boolean retValue = false;
-
-      FileStatus[] files = fs.listStatus(path);
-      for (FileStatus file : files) {
-        if (fs.isDirectory(file.getPath())) {
-          retValue = true;
-          break;
-        }
-      }
-
-      return retValue;
-    }
-
-    private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
-      FileStatus[] files = fs.listStatus(stagingPath);
-      if (files != null && files.length != 0) {
-        for (FileStatus eachFile: files) {
-          if (eachFile.isFile()) {
-            LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-            return false;
-          } else {
-            if (verifyAllFileMoved(fs, eachFile.getPath())) {
-              fs.delete(eachFile.getPath(), false);
-            } else {
-              return false;
-            }
-          }
-        }
-      }
-
-      return true;
-    }
-
-    /**
-     * Attach the sequence number to a path.
-     *
-     * @param path Path
-     * @param seq sequence number
-     * @param nf Number format
-     * @return New path attached with sequence number
-     * @throws IOException
-     */
-    private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
-      String[] tokens = path.getName().split("-");
-      if (tokens.length != 4) {
-        throw new IOException("Wrong result file name:" + path);
-      }
-      return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
-    }
-
-    /**
-     * Attach the sequence number to the output file name and than move the file into the final result path.
-     *
-     * @param fs FileSystem
-     * @param stagingResultDir The staging result dir
-     * @param fileStatus The file status
-     * @param finalOutputPath Final output path
-     * @param nf Number format
-     * @param fileSeq The sequence number
-     * @throws IOException
-     */
-    private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
-                                            FileStatus fileStatus, Path finalOutputPath,
-                                            NumberFormat nf,
-                                            int fileSeq) throws IOException {
-      if (fileStatus.isDirectory()) {
-        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-        if (subPath != null) {
-          Path finalSubPath = new Path(finalOutputPath, subPath);
-          if (!fs.exists(finalSubPath)) {
-            fs.mkdirs(finalSubPath);
-          }
-          int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
-          for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
-            moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq);
-          }
-        } else {
-          throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
-        }
-      } else {
-        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-        if (subPath != null) {
-          Path finalSubPath = new Path(finalOutputPath, subPath);
-          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
-          if (!fs.exists(finalSubPath.getParent())) {
-            fs.mkdirs(finalSubPath.getParent());
-          }
-          if (fs.exists(finalSubPath)) {
-            throw new IOException("Already exists data file:" + finalSubPath);
-          }
-          boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
-          if (success) {
-            LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
-                "to final output[" + finalSubPath + "]");
-          } else {
-            LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
-                "to final output[" + finalSubPath + "]");
-          }
-        }
-      }
-    }
-
-    private String extractSubPath(Path parentPath, Path childPath) {
-      String parentPathStr = parentPath.toUri().getPath();
-      String childPathStr = childPath.toUri().getPath();
-
-      if (parentPathStr.length() > childPathStr.length()) {
-        return null;
-      }
-
-      int index = childPathStr.indexOf(parentPathStr);
-      if (index != 0) {
-        return null;
-      }
-
-      return childPathStr.substring(parentPathStr.length() + 1);
-    }
-
     private static interface QueryHook {
       boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
       void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
@@ -947,7 +624,7 @@ public class Query implements EventHandler<QueryEvent> {
     private void executeNextBlock(Query query) {
       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
       ExecutionBlock nextBlock = cursor.nextBlock();
-      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock);
       nextSubQuery.setPriority(query.priority--);
       query.addSubQuery(nextSubQuery);
       nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index f2d8b3a..42fac3a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -41,7 +41,6 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.history.QueryHistory;
@@ -71,8 +70,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private GlobalPlanner globalPlanner;
 
-  private StorageManager storageManager;
-
   private TajoConf systemConf;
 
   private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
@@ -116,8 +113,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
       this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
       addIfService(dispatcher);
 
-      this.storageManager = StorageManager.getStorageManager(systemConf);
-
       globalPlanner = new GlobalPlanner(systemConf, workerContext);
 
       dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
@@ -373,10 +368,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return clock;
     }
 
-    public StorageManager getStorageManager() {
-      return storageManager;
-    }
-
     public QueryMaster getQueryMaster() {
       return QueryMaster.this;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 75d8ab6..1eaef0f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -35,10 +35,13 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
@@ -54,10 +57,12 @@ import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
@@ -348,6 +353,8 @@ public class QueryMasterTask extends CompositeService {
   }
 
   public synchronized void startQuery() {
+    StorageManager sm = null;
+    LogicalPlan plan = null;
     try {
       if (query != null) {
         LOG.warn("Query already started");
@@ -358,7 +365,29 @@ public class QueryMasterTask extends CompositeService {
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       jsonExpr = null; // remove the possible OOM
-      LogicalPlan plan = planner.createPlan(queryContext, expr);
+      plan = planner.createPlan(queryContext, expr);
+
+      StoreType storeType = PlannerUtil.getStoreType(plan);
+      if (storeType != null) {
+        sm = StorageManager.getStorageManager(systemConf, storeType);
+        StorageProperty storageProperty = sm.getStorageProperty();
+        if (storageProperty.isSortedInsert()) {
+          String tableName = PlannerUtil.getStoreTableName(plan);
+          LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+          TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+          if (tableDesc == null) {
+            throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+          }
+          List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+              getQueryTaskContext().getQueryContext(), tableDesc);
+          if (storageSpecifiedRewriteRules != null) {
+            for (RewriteRule eachRule: storageSpecifiedRewriteRules) {
+              optimizer.addRuleAfterToJoinOpt(eachRule);
+            }
+          }
+        }
+      }
+
       optimizer.optimize(queryContext, plan);
 
       GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
@@ -393,6 +422,15 @@ public class QueryMasterTask extends CompositeService {
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
       initError = t;
+
+      if (plan != null && sm != null) {
+        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+        try {
+          sm.rollbackOutputCommit(rootNode.getChild());
+        } catch (IOException e) {
+          LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+        }
+      }
     }
   }
 
@@ -441,8 +479,14 @@ public class QueryMasterTask extends CompositeService {
     // Create Output Directory
     ////////////////////////////////////////////
 
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
     if (context.isCreateTable() || context.isInsert()) {
-      stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+      if (outputPath == null || outputPath.isEmpty()) {
+        // hbase
+        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+      } else {
+        stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+      }
     } else {
       stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
     }
@@ -570,10 +614,6 @@ public class QueryMasterTask extends CompositeService {
       return queryId;
     }
 
-    public StorageManager getStorageManager() {
-      return queryMasterContext.getStorageManager();
-    }
-
     public Path getStagingDir() {
       return queryContext.getStagingDir();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 0f275e9..75402c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -43,6 +43,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
@@ -265,8 +266,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
     List<String> fragmentList = new ArrayList<String>();
     for (FragmentProto eachFragment : getAllFragments()) {
-      FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
-      fragmentList.add(fileFragment.toString());
+      try {
+        Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
+        fragmentList.add(fragment.toString());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+        fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
+      }
     }
     queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
 
@@ -313,15 +319,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	  }
 	}
 
-  private void addDataLocation(FileFragment fragment) {
+  private void addDataLocation(Fragment fragment) {
     String[] hosts = fragment.getHosts();
-    int[] diskIds = fragment.getDiskIds();
+    int[] diskIds = null;
+    if (fragment instanceof FileFragment) {
+      diskIds = ((FileFragment)fragment).getDiskIds();
+    }
     for (int i = 0; i < hosts.length; i++) {
-      dataLocations.add(new DataLocation(hosts[i], diskIds[i]));
+      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
     }
   }
 
-  public void addFragment(FileFragment fragment, boolean useDataLocation) {
+  public void addFragment(Fragment fragment, boolean useDataLocation) {
     Set<FragmentProto> fragmentProtos;
     if (fragMap.containsKey(fragment.getTableName())) {
       fragmentProtos = fragMap.get(fragment.getTableName());
@@ -336,8 +345,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     totalFragmentNum++;
   }
 
-  public void addFragments(Collection<FileFragment> fragments) {
-    for (FileFragment eachFragment: fragments) {
+  public void addFragments(Collection<Fragment> fragments) {
+    for (Fragment eachFragment: fragments) {
       addFragment(eachFragment, false);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 55b1895..a240ace 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -45,15 +45,18 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
@@ -83,18 +86,20 @@ public class Repartitioner {
     MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
-    StorageManager storageManager = subQuery.getStorageManager();
 
     ScanNode[] scans = execBlock.getScanNodes();
 
     Path tablePath;
-    FileFragment[] fragments = new FileFragment[scans.length];
+    Fragment[] fragments = new Fragment[scans.length];
     long[] stats = new long[scans.length];
 
     // initialize variables from the child operators
     for (int i = 0; i < scans.length; i++) {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
       if (tableDesc == null) { // if it is a real table stored on storage
+        FileStorageManager storageManager =
+            (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+
         tablePath = storageManager.getTablePath(scans[i].getTableName());
         if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
           for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
@@ -107,21 +112,23 @@ public class Repartitioner {
         }
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
       } else {
-        tablePath = new Path(tableDesc.getPath());
         try {
           stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
         } catch (PlanningException e) {
           throw new IOException(e);
         }
 
+        StorageManager storageManager =
+            StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType());
+
         // if table has no data, storageManager will return empty FileFragment.
         // So, we need to handle FileFragment by its size.
         // If we don't check its size, it can cause IndexOutOfBoundsException.
-        List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath);
+        List<Fragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc);
         if (fileFragments.size() > 0) {
           fragments[i] = fileFragments.get(0);
         } else {
-          fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+          fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST});
         }
       }
     }
@@ -268,14 +275,14 @@ public class Repartitioner {
         //select intermediate scan and stats
         ScanNode[] intermediateScans = new ScanNode[largeScanIndexList.size()];
         long[] intermediateScanStats = new long[largeScanIndexList.size()];
-        FileFragment[] intermediateFragments = new FileFragment[largeScanIndexList.size()];
+        Fragment[] intermediateFragments = new Fragment[largeScanIndexList.size()];
         int index = 0;
         for (Integer eachIdx : largeScanIndexList) {
           intermediateScans[index] = scans[eachIdx];
           intermediateScanStats[index] = stats[eachIdx];
           intermediateFragments[index++] = fragments[eachIdx];
         }
-        FileFragment[] broadcastFragments = new FileFragment[broadcastIndexList.size()];
+        Fragment[] broadcastFragments = new Fragment[broadcastIndexList.size()];
         ScanNode[] broadcastScans = new ScanNode[broadcastIndexList.size()];
         index = 0;
         for (Integer eachIdx : broadcastIndexList) {
@@ -309,9 +316,9 @@ public class Repartitioner {
                                                        SubQuery subQuery,
                                                        ScanNode[] scans,
                                                        long[] stats,
-                                                       FileFragment[] fragments,
+                                                       Fragment[] fragments,
                                                        ScanNode[] broadcastScans,
-                                                       FileFragment[] broadcastFragments) throws IOException {
+                                                       Fragment[] broadcastFragments) throws IOException {
     MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
     // The hash map is modeling as follows:
@@ -394,7 +401,7 @@ public class Repartitioner {
     int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
     LOG.info("The determined number of join tasks is " + joinTaskNum);
 
-    List<FileFragment> rightFragments = new ArrayList<FileFragment>();
+    List<Fragment> rightFragments = new ArrayList<Fragment>();
     rightFragments.add(fragments[1]);
 
     if (broadcastFragments != null) {
@@ -404,14 +411,19 @@ public class Repartitioner {
         Path[] partitionScanPaths = null;
         TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
         if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
+          FileStorageManager storageManager =
+              (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+
           PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
           partitionScanPaths = partitionScan.getInputPaths();
           // set null to inputPaths in getFragmentsFromPartitionedTable()
-          getFragmentsFromPartitionedTable(subQuery.getStorageManager(), eachScan, tableDesc);
+          getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
           partitionScan.setInputPaths(partitionScanPaths);
         } else {
-          Collection<FileFragment> scanFragments = subQuery.getStorageManager().getSplits(eachScan.getCanonicalName(),
-              tableDesc.getMeta(), tableDesc.getSchema(), new Path(tableDesc.getPath()));
+          StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(),
+              tableDesc.getMeta().getStoreType());
+          Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(),
+              tableDesc, eachScan);
           if (scanFragments != null) {
             rightFragments.addAll(scanFragments);
           }
@@ -480,10 +492,10 @@ public class Repartitioner {
   /**
    * It creates a number of fragments for all partitions.
    */
-  public static List<FileFragment> getFragmentsFromPartitionedTable(StorageManager sm,
+  public static List<Fragment> getFragmentsFromPartitionedTable(FileStorageManager sm,
                                                                           ScanNode scan,
                                                                           TableDesc table) throws IOException {
-    List<FileFragment> fragments = Lists.newArrayList();
+    List<Fragment> fragments = Lists.newArrayList();
     PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
     fragments.addAll(sm.getSplits(
         scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
@@ -492,7 +504,7 @@ public class Repartitioner {
   }
 
   private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
-                                                          int baseScanId, FileFragment[] fragments) throws IOException {
+                                                          int baseScanId, Fragment[] fragments) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
 
@@ -511,23 +523,27 @@ public class Repartitioner {
     //     . add all partition paths to node's inputPaths variable
     //  -> SCAN
     //     . add all fragments to broadcastFragments
-    Collection<FileFragment> baseFragments = null;
-    List<FileFragment> broadcastFragments = new ArrayList<FileFragment>();
+    Collection<Fragment> baseFragments = null;
+    List<Fragment> broadcastFragments = new ArrayList<Fragment>();
     for (int i = 0; i < scans.length; i++) {
       ScanNode scan = scans[i];
       TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
       TableMeta meta = desc.getMeta();
 
-      Collection<FileFragment> scanFragments;
+      Collection<Fragment> scanFragments;
       Path[] partitionScanPaths = null;
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
         PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
         partitionScanPaths = partitionScan.getInputPaths();
         // set null to inputPaths in getFragmentsFromPartitionedTable()
-        scanFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+        FileStorageManager storageManager =
+            (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+        scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
       } else {
-        scanFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
-            new Path(desc.getPath()));
+        StorageManager storageManager =
+            StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType());
+
+        scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan);
       }
 
       if (scanFragments != null) {
@@ -630,46 +646,66 @@ public class Repartitioner {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+        .getTablePath(scan.getTableName());
 
     ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0);
     SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
     SortSpec [] sortSpecs = sortNode.getSortKeys();
     Schema sortSchema = new Schema(channel.getShuffleKeys());
 
+    TupleRange[] ranges;
+    int determinedTaskNum;
+
     // calculate the number of maximum query ranges
     TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
 
     // If there is an empty table in inner join, it should return zero rows.
-    if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) {
+    if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) {
       return;
     }
     TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false);
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
-    BigInteger card = partitioner.getTotalCardinality();
 
-    // if the number of the range cardinality is less than the desired number of tasks,
-    // we set the the number of tasks to the number of range cardinality.
-    int determinedTaskNum;
-    if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
-      LOG.info(subQuery.getId() + ", The range cardinality (" + card
-          + ") is less then the desired number of tasks (" + maxNum + ")");
-      determinedTaskNum = card.intValue();
+    if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
+      StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+      CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+      LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot();
+      TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+      if (tableDesc == null) {
+        throw new IOException("Can't get table meta data from catalog: " +
+            PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
+      }
+      ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType)
+          .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc,
+              sortNode.getInSchema(), sortSpecs,
+              mergedRange);
+      determinedTaskNum = ranges.length;
     } else {
-      determinedTaskNum = maxNum;
-    }
+      RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
+      BigInteger card = partitioner.getTotalCardinality();
+
+      // if the number of the range cardinality is less than the desired number of tasks,
+      // we set the the number of tasks to the number of range cardinality.
+      if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
+        LOG.info(subQuery.getId() + ", The range cardinality (" + card
+            + ") is less then the desired number of tasks (" + maxNum + ")");
+        determinedTaskNum = card.intValue();
+      } else {
+        determinedTaskNum = maxNum;
+      }
 
-    LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
-        " sub ranges (total units: " + determinedTaskNum + ")");
-    TupleRange [] ranges = partitioner.partition(determinedTaskNum);
-    if (ranges == null || ranges.length == 0) {
-      LOG.warn(subQuery.getId() + " no range infos.");
-    }
-    TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
-    if (LOG.isDebugEnabled()) {
-      if (ranges != null) {
-        for (TupleRange eachRange : ranges) {
-          LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+      LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
+          " sub ranges (total units: " + determinedTaskNum + ")");
+      ranges = partitioner.partition(determinedTaskNum);
+      if (ranges == null || ranges.length == 0) {
+        LOG.warn(subQuery.getId() + " no range infos.");
+      }
+      TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
+      if (LOG.isDebugEnabled()) {
+        if (ranges != null) {
+          for (TupleRange eachRange : ranges) {
+            LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+          }
         }
       }
     }
@@ -772,14 +808,15 @@ public class Repartitioner {
 
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
                                                  SubQuery subQuery, DataChannel channel,
-                                                 int maxNum) {
+                                                 int maxNum) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+        .getTablePath(scan.getTableName());
 
-    FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
-    List<FileFragment> fragments = new ArrayList<FileFragment>();
+    Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    List<Fragment> fragments = new ArrayList<Fragment>();
     fragments.add(frag);
     SubQuery.scheduleFragments(subQuery, fragments);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 39bb7ed..7f05fa4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -36,7 +36,7 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -59,10 +59,11 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.history.QueryUnitHistory;
@@ -96,7 +97,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private TableStats resultStatistics;
   private TableStats inputStatistics;
   private EventHandler<Event> eventHandler;
-  private final StorageManager sm;
   private AbstractTaskScheduler taskScheduler;
   private QueryMasterTask.QueryMasterTaskContext context;
   private final List<String> diagnostics = new ArrayList<String>();
@@ -286,12 +286,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private AtomicInteger completeReportReceived = new AtomicInteger(0);
   private SubQueryHistory finalSubQueryHistory;
 
-  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
-                  ExecutionBlock block, StorageManager sm) {
+  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
     this.context = context;
     this.masterPlan = masterPlan;
     this.block = block;
-    this.sm = sm;
     this.eventHandler = context.getEventHandler();
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -509,10 +507,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return this.priority;
   }
 
-  public StorageManager getStorageManager() {
-    return sm;
-  }
-  
   public ExecutionBlockId getId() {
     return block.getId();
   }
@@ -677,14 +671,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
-    // get default or store type
-    CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting
 
     // if store plan (i.e., CREATE or INSERT OVERWRITE)
-    StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE);
-    if (storeTableNode != null) {
-      storeType = storeTableNode.getStorageType();
+    StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+    if (storeType == null) {
+      // get default or store type
+      storeType = StoreType.CSV;
     }
+
     schema = channel.getSchema();
     meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
     inputStatistics = statsArray[0];
@@ -1043,7 +1037,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       ScanNode scan = scans[0];
       TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
 
-      Collection<FileFragment> fragments;
+      Collection<Fragment> fragments;
       TableMeta meta = table.getMeta();
 
       // Depending on scanner node's type, it creates fragments. If scan is for
@@ -1052,10 +1046,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       // span a number of blocks or possibly consists of a number of files.
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
         // After calling this method, partition paths are removed from the physical plan.
-        fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
+        FileStorageManager storageManager =
+            (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+        fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
       } else {
-        Path inputPath = new Path(table.getPath());
-        fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+        StorageManager storageManager =
+            StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType());
+        fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
       }
 
       SubQuery.scheduleFragments(subQuery, fragments);
@@ -1073,27 +1070,27 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+  public static void scheduleFragment(SubQuery subQuery, Fragment fragment) {
     subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
         subQuery.getId(), fragment));
   }
 
 
-  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
-    for (FileFragment eachFragment : fragments) {
+  public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> fragments) {
+    for (Fragment eachFragment : fragments) {
       scheduleFragment(subQuery, eachFragment);
     }
   }
 
-  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
-                                       Collection<FileFragment> broadcastFragments) {
-    for (FileFragment eachLeafFragment : leftFragments) {
+  public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> leftFragments,
+                                       Collection<Fragment> broadcastFragments) {
+    for (Fragment eachLeafFragment : leftFragments) {
       scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
     }
   }
 
   public static void scheduleFragment(SubQuery subQuery,
-                                      FileFragment leftFragment, Collection<FileFragment> rightFragments) {
+                                      Fragment leftFragment, Collection<Fragment> rightFragments) {
     subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
         subQuery.getId(), leftFragment, rightFragments));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 0cc87fc..f1a9224 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -39,6 +39,7 @@ import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.util.StringUtils;
 
 import java.io.IOException;
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
index 0de1b2b..3147bb6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -38,7 +38,7 @@ public class IndexUtil {
   public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
     StringBuilder builder = new StringBuilder(); 
     builder.append(fragment.getPath().getName() + "_");
-    builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
+    builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
     for(int i = 0 ; i < keys.length ; i ++) {
       builder.append(keys[i].getSortKey().getSimpleName()+"_");
     }