You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/29 11:25:51 UTC

[3/3] git commit: TAJO-287: Improve Fragment to be more generic. (hyunsik)

TAJO-287: Improve Fragment to be more generic. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3c22d3eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3c22d3eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3c22d3eb

Branch: refs/heads/master
Commit: 3c22d3eba437e9130ee74de8f935c7fc330b1217
Parents: 2f09450
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 28 21:46:34 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Oct 29 19:20:00 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/catalog/json/TableMetaAdapter.java     |   1 +
 .../src/main/proto/CatalogProtos.proto          |   5 +
 .../org/apache/tajo/client/ResultSetImpl.java   |  12 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  30 ++-
 .../planner/physical/BSTIndexScanExec.java      |   3 +-
 .../planner/physical/ExternalSortExec.java      |  10 +-
 .../engine/planner/physical/SeqScanExec.java    |  13 +-
 .../tajo/engine/query/QueryUnitRequest.java     |   6 +-
 .../tajo/engine/query/QueryUnitRequestImpl.java |  20 +-
 .../apache/tajo/master/TaskSchedulerImpl.java   |   8 +-
 .../tajo/master/event/SubQuerySucceeEvent.java  |   2 +
 .../tajo/master/querymaster/QueryUnit.java      |  26 +-
 .../tajo/master/querymaster/Repartitioner.java  |  24 +-
 .../tajo/master/querymaster/SubQuery.java       |   8 +-
 .../java/org/apache/tajo/util/IndexUtil.java    |   6 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  47 +---
 .../apache/tajo/worker/TaskAttemptContext.java  |  44 ++--
 .../planner/physical/TestBNLJoinExec.java       |  13 +-
 .../planner/physical/TestBSTIndexExec.java      |  12 +-
 .../planner/physical/TestExternalSortExec.java  |   5 +-
 .../physical/TestFullOuterHashJoinExec.java     |  25 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  37 +--
 .../planner/physical/TestHashAntiJoinExec.java  |   7 +-
 .../planner/physical/TestHashJoinExec.java      |   7 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   7 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  31 +--
 .../physical/TestLeftOuterNLJoinExec.java       |  36 +--
 .../planner/physical/TestMergeJoinExec.java     |   7 +-
 .../engine/planner/physical/TestNLJoinExec.java |  13 +-
 .../planner/physical/TestPhysicalPlanner.java   |  95 +++----
 .../physical/TestRightOuterHashJoinExec.java    |  24 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  37 +--
 .../engine/planner/physical/TestSortExec.java   |   5 +-
 .../apache/tajo/storage/TestFileFragment.java   |  97 +++++++
 .../org/apache/tajo/storage/TestFragment.java   | 100 --------
 .../org/apache/tajo/storage/TestRowFile.java    |   7 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   9 +-
 .../tajo/storage/AbstractStorageManager.java    | 116 +++++----
 .../java/org/apache/tajo/storage/CSVFile.java   |  17 +-
 .../org/apache/tajo/storage/FileAppender.java   |   2 +-
 .../org/apache/tajo/storage/FileScanner.java    |   5 +-
 .../java/org/apache/tajo/storage/Fragment.java  | 249 ------------------
 .../org/apache/tajo/storage/MergeScanner.java   |  23 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  13 +-
 .../java/org/apache/tajo/storage/RowFile.java   |  15 +-
 .../java/org/apache/tajo/storage/Storage.java   |   3 +-
 .../org/apache/tajo/storage/StorageManager.java |   3 +-
 .../tajo/storage/StorageManagerFactory.java     |   5 +-
 .../tajo/storage/fragment/FileFragment.java     | 250 +++++++++++++++++++
 .../apache/tajo/storage/fragment/Fragment.java  |  31 +++
 .../storage/fragment/FragmentConvertor.java     | 123 +++++++++
 .../tajo/storage/rcfile/RCFileWrapper.java      |  19 +-
 .../tajo/storage/trevni/TrevniAppender.java     |   4 +-
 .../tajo/storage/trevni/TrevniScanner.java      |   6 +-
 .../apache/tajo/storage/v2/CSVFileScanner.java  |  17 +-
 .../apache/tajo/storage/v2/FileScannerV2.java   |  14 +-
 .../apache/tajo/storage/v2/RCFileScanner.java   |  14 +-
 .../tajo/storage/v2/StorageManagerV2.java       |   4 +-
 .../src/main/resources/storage-default.xml      |  26 +-
 .../tajo/storage/TestCompressionStorages.java   |  11 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   9 +-
 .../apache/tajo/storage/TestStorageManager.java |   2 +-
 .../org/apache/tajo/storage/TestStorages.java   |  11 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |  65 ++---
 .../index/TestSingleCSVFileBSTIndex.java        |  13 +-
 .../apache/tajo/storage/rcfile/TestRCFile.java  |   2 +-
 .../tajo/storage/v2/TestCSVCompression.java     |  11 +-
 .../apache/tajo/storage/v2/TestCSVScanner.java  |   5 +-
 .../apache/tajo/storage/v2/TestStorages.java    |  11 +-
 .../src/test/resources/storage-default.xml      |  37 ++-
 71 files changed, 1117 insertions(+), 860 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3f309cc..9f14a4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-287: Improve Fragment to be more generic. (hyunsik)
+
     TAJO-274: Maintaining connectivity to Tajo master regardless of the restart
     of the Tajo master. (Keuntae Park via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
index a3337c8..ce42bea 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
@@ -20,6 +20,7 @@ package org.apache.tajo.catalog.json;
 
 import com.google.gson.*;
 import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.json.GsonSerDerAdapter;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 3de5409..9c57bf3 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -82,6 +82,11 @@ message KeyValueSetProto {
 }
 
 message FragmentProto {
+  required string id = 1;
+  required bytes contents = 2;
+}
+
+message FileFragmentProto {
 	required string id = 1;
 	required string path = 2;
 	required int64 startOffset = 3;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
index b713f71..bfe0cdc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
@@ -31,7 +31,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.MergeScanner;
 import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.Tuple;
@@ -72,8 +72,8 @@ public class ResultSetImpl implements ResultSet {
     if(desc != null) {
       fs = desc.getPath().getFileSystem(conf);
       this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
-      Collection<Fragment> frags = getFragments(desc.getMeta(), desc.getPath());
-      scanner = new MergeScanner(conf, desc.getMeta(), schema, frags);
+      Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+      scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
     }
     init();
   }
@@ -91,9 +91,9 @@ public class ResultSetImpl implements ResultSet {
     }
   }
 
-  private Collection<Fragment> getFragments(TableMeta meta, Path tablePath)
+  private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
       throws IOException {
-    List<Fragment> fraglist = Lists.newArrayList();
+    List<FileFragment> fraglist = Lists.newArrayList();
     FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
       @Override
       public boolean accept(Path path) {
@@ -107,7 +107,7 @@ public class ResultSetImpl implements ResultSet {
       if (files[i].getLen() == 0) {
         continue;
       }
-      fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+      fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
     }
     return fraglist;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 4a3940c..db58e32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -27,6 +27,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
@@ -37,14 +40,15 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
@@ -177,12 +181,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) {
+  private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
     long size = 0;
     for (String tableId : tableIds) {
-      Fragment[] fragments = ctx.getTables(tableId);
-      for (Fragment frag : fragments) {
-        size += frag.getLength();
+      // TODO - CSV is a hack.
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV,
+          ctx.getTables(tableId));
+      for (FileFragment frag : fragments) {
+        size += frag.getEndKey();
       }
     }
     return size;
@@ -636,7 +642,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
         "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
 
-    Fragment[] fragments = ctx.getTables(scanNode.getCanonicalName());
+    FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
     return new SeqScanExec(ctx, sm, scanNode, fragments);
   }
 
@@ -751,17 +757,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
         "Error: There is no table matched to %s", annotation.getCanonicalName());
 
-    Fragment[] fragments = ctx.getTables(annotation.getTableName());
+    FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+    List<FileFragment> fragments =
+        FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
 
-    String indexName = IndexUtil.getIndexNameOfFrag(fragments[0],
-        annotation.getSortKeys());
+    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
     Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
 
     TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());
-    return new BSTIndexScanExec(ctx, sm, annotation, fragments[0], new Path(
-        indexPath, indexName), annotation.getKeySchema(), comp,
-        annotation.getDatum());
+    return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
+        annotation.getKeySchema(), comp, annotation.getDatum());
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 2d81180..5de6634 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
@@ -48,7 +49,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   
   public BSTIndexScanExec(TaskAttemptContext context,
                           AbstractStorageManager sm , ScanNode scanNode ,
-       Fragment fragment, Path fileName , Schema keySchema,
+       FileFragment fragment, Path fileName , Schema keySchema,
        TupleComparator comparator , Datum[] datum) throws IOException {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 99df92e..0a8cb62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -75,7 +75,7 @@ public class ExternalSortExec extends SortExec {
     // So, I add the scheme 'file:/' to path. But, it should be improved.
     Path localPath = new Path(sortTmpDir + "/0_" + chunkId);
 
-    appender = new RawFile.RawFileAppender(context.getConf(), meta, inSchema, localPath);
+    appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, localPath);
     appender.init();
 
     for (Tuple t : tupleSlots) {
@@ -147,7 +147,7 @@ public class ExternalSortExec extends SortExec {
             Path leftChunk = getChunkPath(level, chunkId);
             Path rightChunk = getChunkPath(level, chunkId + 1);
 
-            appender = new RawFile.RawFileAppender(context.getConf(), meta, inSchema, nextChunk);
+            appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, nextChunk);
             appender.init();
             merge(appender, leftChunk, rightChunk);
 
@@ -167,7 +167,7 @@ public class ExternalSortExec extends SortExec {
       }
 
       Path result = getChunkPath(level, 0);
-      this.result = new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), result);
+      this.result = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, result);
       sorted = true;
     }
 
@@ -176,10 +176,10 @@ public class ExternalSortExec extends SortExec {
 
   private void merge(RawFile.RawFileAppender appender, Path left, Path right)
       throws IOException {
-    RawFile.RawFileScanner leftScan = new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), left);
+    RawFile.RawFileScanner leftScan = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, left);
 
     RawFile.RawFileScanner rightScan =
-        new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), right);
+        new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, right);
 
     Tuple leftTuple = leftScan.next();
     Tuple rightTuple = rightScan.next();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 2045602..22af472 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -28,7 +31,6 @@ import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -41,13 +43,13 @@ public class SeqScanExec extends PhysicalExec {
   private EvalNode qual = null;
   private EvalContext qualCtx;
 
-  private Fragment [] fragments;
+  private CatalogProtos.FragmentProto [] fragments;
 
   private Projector projector;
   private EvalContext [] evalContexts;
 
   public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
-                     ScanNode plan, Fragment [] fragments) throws IOException {
+                     ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
 
     this.plan = plan;
@@ -88,8 +90,9 @@ public class SeqScanExec extends PhysicalExec {
     this.evalContexts = projector.renew();
 
     if (fragments.length > 1) {
-      this.scanner = new MergeScanner(context.getConf(), plan.getTableDesc().getMeta(), plan.getTableSchema(),
-          TUtil.newList(fragments));
+      this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
+          FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
+              fragments));
     } else {
       this.scanner = StorageManagerFactory.getStorageManager(
           context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getTableSchema(), fragments[0], projected);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
index edc5f3d..383a787 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -21,12 +21,12 @@
  */
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.storage.Fragment;
 
 import java.net.URI;
 import java.util.List;
@@ -34,7 +34,7 @@ import java.util.List;
 public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
 
 	public QueryUnitAttemptId getId();
-	public List<Fragment> getFragments();
+	public List<CatalogProtos.FragmentProto> getFragments();
 	public String getOutputTableId();
 	public boolean isClusteredOutput();
 	public String getSerializedData();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index f9cc550..3c3c3dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -18,22 +18,24 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 public class QueryUnitRequestImpl implements QueryUnitRequest {
 	
   private QueryUnitAttemptId id;
-  private List<Fragment> fragments;
+  private List<FragmentProto> fragments;
   private String outputTable;
 	private boolean isUpdated;
 	private boolean clusteredOutput;
@@ -55,7 +57,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		this.isUpdated = false;
 	}
 	
-	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
+	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments,
 			String outputTable, boolean clusteredOutput,
 			String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
 		this();
@@ -69,7 +71,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		isUpdated = false;
 	}
 	
-	public void set(QueryUnitAttemptId id, List<Fragment> fragments,
+	public void set(QueryUnitAttemptId id, List<FragmentProto> fragments,
 			String outputTable, boolean clusteredOutput,
 			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
 		this.id = id;
@@ -106,16 +108,16 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	}
 
 	@Override
-	public List<Fragment> getFragments() {
+	public List<FragmentProto> getFragments() {
 		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
 		if (fragments != null) {
 			return fragments;
 		}
 		if (fragments == null) {
-			fragments = new ArrayList<Fragment>();
+			fragments = new ArrayList<FragmentProto>();
 		}
 		for (int i = 0; i < p.getFragmentsCount(); i++) {
-			fragments.add(new Fragment(p.getFragments(i)));
+			fragments.add(p.getFragments(i));
 		}
 		return this.fragments;
 	}
@@ -284,7 +286,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		}
 		if (fragments != null) {
 			for (int i = 0; i < fragments.size(); i++) {
-				builder.addFragments(fragments.get(i).getProto());
+				builder.addFragments(fragments.get(i));
 			}
 		}
 		if (this.outputTable != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index e09dd69..1d53c87 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -41,7 +42,8 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.URI;
@@ -49,6 +51,8 @@ import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 public class TaskSchedulerImpl extends AbstractService
     implements TaskScheduler {
   private static final Log LOG = LogFactory.getLog(TaskScheduleEvent.class);
@@ -461,7 +465,7 @@ public class TaskSchedulerImpl extends AbstractService
           QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
-              new ArrayList<Fragment>(task.getAllFragments()),
+              new ArrayList<FragmentProto>(task.getAllFragments()),
               "",
               false,
               task.getLogicalPlan().toJson(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index 5262a07..2485421 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -19,6 +19,8 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.master.querymaster.SubQueryState;
 
 public class SubQuerySucceeEvent extends SubQueryCompletedEvent {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index ff0d120..5c6f79f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -28,12 +28,14 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.URI;
@@ -44,6 +46,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 public class QueryUnit implements EventHandler<TaskEvent> {
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(QueryUnit.class);
@@ -54,7 +58,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	private LogicalNode plan = null;
 	private List<ScanNode> scan;
 	
-	private Map<String, Fragment> fragMap;
+	private Map<String, FragmentProto> fragMap;
 	private Map<String, Set<URI>> fetchMap;
 	
   private List<Partition> partitions;
@@ -130,7 +134,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return this.isLeafTask;
   }
 
-  public void setDataLocations(Fragment fragment) {
+  public void setDataLocations(FileFragment fragment) {
     String[] hosts = fragment.getHosts();
     int[] blockCount = fragment.getHostsBlockCount();
     int[] volumeIds = fragment.getDiskIds();
@@ -176,13 +180,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	}
 
   @Deprecated
-  public void setFragment(String tableId, Fragment fragment) {
-    this.fragMap.put(tableId, fragment);
+  public void setFragment(String tableId, FileFragment fragment) {
+    this.fragMap.put(tableId, fragment.getProto());
     setDataLocations(fragment);
   }
 
-  public void setFragment2(Fragment fragment) {
-    this.fragMap.put(fragment.getName(), fragment);
+  public void setFragment2(FileFragment fragment) {
+    this.fragMap.put(fragment.getTableName(), fragment.getProto());
     setDataLocations(fragment);
   }
 
@@ -220,12 +224,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	  this.fetchMap.clear();
 	  this.fetchMap.putAll(fetches);
 	}
-	
-  public Fragment getFragment(String tableId) {
-    return this.fragMap.get(tableId);
-  }
 
-  public Collection<Fragment> getAllFragments() {
+  public Collection<FragmentProto> getAllFragments() {
     return fragMap.values();
   }
 	
@@ -268,7 +268,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	@Override
 	public String toString() {
 		String str = new String(plan.getType() + " \n");
-		for (Entry<String, Fragment> e : fragMap.entrySet()) {
+		for (Entry<String, FragmentProto> e : fragMap.entrySet()) {
 		  str += e.getKey() + " : ";
       str += e.getValue() + " ";
 		}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index d0cca20..ef14c31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -40,7 +40,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
@@ -75,7 +75,7 @@ public class Repartitioner {
     ScanNode[] scans = execBlock.getScanNodes();
 
     Path tablePath;
-    Fragment [] fragments = new Fragment[2];
+    FileFragment[] fragments = new FileFragment[2];
     TableStats[] stats = new TableStats[2];
 
     // initialize variables from the child operators
@@ -89,7 +89,7 @@ public class Repartitioner {
 
         tablePath = storageManager.getTablePath(scans[i].getTableName());
         stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
-        fragments[i] = new Fragment(scans[i].getCanonicalName(), tablePath, 0, 0);
+        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0);
       } else {
         tablePath = tableDesc.getPath();
         stats[i] = tableDesc.getStats();
@@ -198,7 +198,7 @@ public class Repartitioner {
     return tasks;
   }
 
-  private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, Fragment broadcasted) throws IOException {
+  private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, FileFragment broadcasted) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
     Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
@@ -210,13 +210,13 @@ public class Repartitioner {
     meta = desc.getMeta();
 
     FileSystem fs = inputPath.getFileSystem(subQuery.getContext().getConf());
-    List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+    List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
         inputPath);
     QueryUnit queryUnit;
     List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
 
     int i = 0;
-    for (Fragment fragment : fragments) {
+    for (FileFragment fragment : fragments) {
       queryUnit = newQueryUnit(subQuery, i++, fragment);
       queryUnit.setFragment2(broadcasted);
       queryUnits.add(queryUnit);
@@ -224,7 +224,7 @@ public class Repartitioner {
     return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
   }
 
-  private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+  private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit unit = new QueryUnit(
         QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
@@ -234,7 +234,7 @@ public class Repartitioner {
     return unit;
   }
 
-  private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+  private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, FileFragment[] fragments, int taskNum) {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit [] tasks = new QueryUnit[taskNum];
     for (int i = 0; i < taskNum; i++) {
@@ -242,7 +242,7 @@ public class Repartitioner {
           QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
           subQuery.getEventHandler());
       tasks[i].setLogicalPlan(execBlock.getPlan());
-      for (Fragment fragment : fragments) {
+      for (FileFragment fragment : fragments) {
         tasks[i].setFragment2(fragment);
       }
     }
@@ -342,7 +342,7 @@ public class Repartitioner {
         " sub ranges (total units: " + determinedTaskNum + ")");
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
 
-    Fragment dummyFragment = new Fragment(scan.getTableName(), tablePath, 0, 0);
+    FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0);
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
@@ -438,7 +438,7 @@ public class Repartitioner {
     tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
 
 
-    Fragment frag = new Fragment(scan.getCanonicalName(), tablePath, 0, 0);
+    FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0);
 
 
     Map<String, List<IntermediateEntry>> hashedByHost;
@@ -563,7 +563,7 @@ public class Repartitioner {
   }
 
   public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
-                                                     Fragment frag) {
+                                                     FileFragment frag) {
     LogicalNode plan = subQuery.getBlock().getPlan();
     QueryUnit [] tasks = new QueryUnit[num];
     for (int i = 0; i < num; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0f1cbde..61872ab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -53,7 +53,7 @@ import org.apache.tajo.master.TaskScheduler;
 import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
 
 import java.io.IOException;
 import java.util.*;
@@ -642,14 +642,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       meta = desc.getMeta();
 
       // TODO - should be change the inner directory
-      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+      List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
           inputPath);
 
       QueryUnit queryUnit;
       List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
 
       int i = 0;
-      for (Fragment fragment : fragments) {
+      for (FileFragment fragment : fragments) {
         queryUnit = newQueryUnit(subQuery, i++, fragment);
         queryUnits.add(queryUnit);
       }
@@ -657,7 +657,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
     }
 
-    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit unit = new QueryUnit(
           QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
index 62bcf2d..dc77700 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -28,17 +28,17 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.logical.IndexScanNode;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
 
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map.Entry;
 
 public class IndexUtil {
-  public static String getIndexNameOfFrag(Fragment fragment, SortSpec[] keys) {
+  public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
     StringBuilder builder = new StringBuilder(); 
     builder.append(fragment.getPath().getName() + "_");
-    builder.append(fragment.getStartOffset() + "_" + fragment.getLength() + "_");
+    builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
     for(int i = 0 ; i < keys.length ; i ++) {
       builder.append(keys[i].getSortKey().getColumnName()+"_");
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 922719d..b5516f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -45,9 +45,9 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.File;
@@ -58,6 +58,8 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 public class Task {
   private static final Log LOG = LogFactory.getLog(Task.class);
 
@@ -141,8 +143,7 @@ public class Task {
         taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
     this.context = new TaskAttemptContext(systemConf, taskId,
-        request.getFragments().toArray(new Fragment[request.getFragments().size()]),
-        taskDir);
+        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
 
@@ -159,7 +160,7 @@ public class Task {
       StoreTableNode store = (StoreTableNode) plan;
       this.partitionType = store.getPartitionType();
       if (partitionType == PartitionType.RANGE_PARTITION) {
-        SortNode sortNode = (SortNode) store.getChild();
+        SortNode sortNode = store.getChild();
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
       }
@@ -181,11 +182,6 @@ public class Task {
         + (interQuery ? ", Use " + this.partitionType  + " partitioning":""));
 
     LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
-    for (Fragment f: request.getFragments()) {
-      TableDesc desc = descs.get(f.getName());
-      LOG.info("Table Id:" + f.getName() + ", path:" + desc.getPath() + "(" + desc.getMeta().getStoreType() + "), " +
-          "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")");
-    }
     LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
     for (Fetch f : request.getFetches()) {
       LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
@@ -237,25 +233,6 @@ public class Task {
 
   public void localize(QueryUnitRequest request) throws IOException {
     fetcherRunners = getFetchRunners(context, request.getFetches());
-
-    List<Fragment> cached = Lists.newArrayList();
-    for (Fragment frag : request.getFragments()) {
-      if (frag.isDistCached()) {
-        cached.add(frag);
-      }
-    }
-
-    if (cached.size() > 0) {
-      Path inFile;
-
-      int i = fetcherRunners.size();
-      for (Fragment cache : cached) {
-        inFile = new Path(inputTableBaseDir, "in_" + i);
-        taskRunnerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile);
-        cache.setPath(inFile);
-        i++;
-      }
-    }
   }
 
   public QueryUnitAttemptId getId() {
@@ -354,8 +331,8 @@ public class Task {
     Collection<String> inputs = Lists.newArrayList(context.getInputTables());
     for (String inputTable: inputs) {
       File tableDir = new File(context.getFetchIn(), inputTable);
-      Fragment [] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
-      context.changeFragment(inputTable, frags);
+      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      context.updateAssignedFragments(inputTable, frags);
     }
   }
 
@@ -449,26 +426,26 @@ public class Task {
     return false;
   }
 
-  private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
+  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
       throws IOException {
     Configuration c = new Configuration(systemConf);
     c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
     FileSystem fs = FileSystem.get(c);
     Path tablePath = new Path(file.getAbsolutePath());
 
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
 
     FileStatus[] fileLists = fs.listStatus(tablePath);
     for (FileStatus f : fileLists) {
       if (f.getLen() == 0) {
         continue;
       }
-      tablet = new Fragment(name, f.getPath(), 0l, f.getLen());
+      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
       listTablets.add(tablet);
     }
 
-    Fragment[] tablets = new Fragment[listTablets.size()];
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
     return tablets;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index f05e1a1..86d8cf2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
@@ -29,13 +30,16 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 
 import java.io.File;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 
 /**
  * Contains the information about executing subquery.
@@ -43,7 +47,7 @@ import java.util.concurrent.CountDownLatch;
 public class TaskAttemptContext {
   private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
   private final TajoConf conf;
-  private final Map<String, List<Fragment>> fragmentMap = new HashMap<String, List<Fragment>>();
+  private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
 
   private TaskAttemptState state;
   private TableStats resultStats;
@@ -61,18 +65,18 @@ public class TaskAttemptContext {
   private Enforcer enforcer;
 
   public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
-                            final Fragment[] fragments,
+                            final FragmentProto[] fragments,
                             final Path workDir) {
     this.conf = conf;
     this.queryId = queryId;
     
-    for(Fragment t : fragments) {
-      if (fragmentMap.containsKey(t.getName())) {
-        fragmentMap.get(t.getName()).add(t);
+    for(FragmentProto t : fragments) {
+      if (fragmentMap.containsKey(t.getId())) {
+        fragmentMap.get(t.getId()).add(t);
       } else {
-        List<Fragment> frags = new ArrayList<Fragment>();
+        List<FragmentProto> frags = new ArrayList<FragmentProto>();
         frags.add(t);
-        fragmentMap.put(t.getName(), frags);
+        fragmentMap.put(t.getId(), frags);
       }
     }
 
@@ -82,6 +86,12 @@ public class TaskAttemptContext {
     state = TaskAttemptState.TA_PENDING;
   }
 
+  @VisibleForTesting
+  public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+                            final Fragment [] fragments,  final Path workDir) {
+    this(conf, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+  }
+
   public TajoConf getConf() {
     return this.conf;
   }
@@ -173,15 +183,15 @@ public class TaskAttemptContext {
     return repartitions.entrySet().iterator();
   }
   
-  public void changeFragment(String tableId, Fragment [] fragments) {
+  public void updateAssignedFragments(String tableId, Fragment[] fragments) {
     fragmentMap.remove(tableId);
     for(Fragment t : fragments) {
-      if (fragmentMap.containsKey(t.getName())) {
-        fragmentMap.get(t.getName()).add(t);
+      if (fragmentMap.containsKey(t.getTableName())) {
+        fragmentMap.get(t.getTableName()).add(t.getProto());
       } else {
-        List<Fragment> frags = new ArrayList<Fragment>();
-        frags.add(t);
-        fragmentMap.put(t.getName(), frags);
+        List<FragmentProto> frags = new ArrayList<FragmentProto>();
+        frags.add(t.getProto());
+        fragmentMap.put(t.getTableName(), frags);
       }
     }
   }
@@ -202,7 +212,7 @@ public class TaskAttemptContext {
     this.progress = progress;
   }
 
-  public Fragment getTable(String id) {
+  public FragmentProto getTable(String id) {
     return fragmentMap.get(id).get(0);
   }
 
@@ -214,8 +224,8 @@ public class TaskAttemptContext {
     return fragmentMap.keySet();
   }
   
-  public Fragment [] getTables(String id) {
-    return fragmentMap.get(id).toArray(new Fragment[fragmentMap.get(id).size()]);
+  public FragmentProto [] getTables(String id) {
+    return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
   }
   
   public int hashCode() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 4d2bd6c..5f0457e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -140,11 +141,11 @@ public class TestBNLJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
 
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
@@ -170,11 +171,11 @@ public class TestBNLJoinExec {
     Expr context = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
 
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 615befa..c9499d6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -47,6 +49,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -156,10 +159,10 @@ public class TestBSTIndexExec {
     this.rndKey = rnd.nextInt(250);
     final String QUERY = "select * from employee where managerId = " + rndKey;
     
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERY);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -192,11 +195,12 @@ public class TestBSTIndexExec {
       Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
           "Error: There is no table matched to %s", scanNode.getTableName());
 
-      Fragment[] fragments = ctx.getTables(scanNode.getTableName());
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(),
+          ctx.getTables(scanNode.getTableName()));
       
       Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
 
-      return new BSTIndexScanExec(ctx, sm, scanNode, fragments[0], idxPath,
+      return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath,
           idxSchema, comp , datum);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 282f5be..e236126 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -109,11 +110,11 @@ public class TestExternalSortExec {
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    Fragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+    FileFragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index c7fec48..63f14b7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -253,9 +254,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -290,9 +291,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -326,9 +327,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -363,10 +364,10 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 61802fd..c92d1c9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -296,9 +297,9 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -331,9 +332,9 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -366,9 +367,9 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -402,9 +403,9 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -440,10 +441,10 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -477,10 +478,10 @@ public class TestFullOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index e521a94..57f2376 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -140,12 +141,12 @@ public class TestHashAntiJoinExec {
 
   @Test
   public final void testHashAntiJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index ba02c21..2a1af7c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -140,9 +141,9 @@ public class TestHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index e4dcb95..d03f3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -144,12 +145,12 @@ public class TestHashSemiJoinExec {
 
   @Test
   public final void testHashSemiJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 58f9b3b..4ac423b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -254,9 +255,9 @@ public class TestLeftOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -282,12 +283,12 @@ public class TestLeftOuterHashJoinExec {
 
   @Test
   public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -322,12 +323,12 @@ public class TestLeftOuterHashJoinExec {
     @Test
   public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
     
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -363,12 +364,12 @@ public class TestLeftOuterHashJoinExec {
    @Test
   public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
     
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -404,12 +405,12 @@ public class TestLeftOuterHashJoinExec {
    @Test
   public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
     
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,