You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/29 11:25:50 UTC

[2/3] TAJO-287: Improve Fragment to be more generic. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 025af87..ce31851 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -19,8 +19,9 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,15 +38,16 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestLeftOuterNLJoinExec {
   private TajoConf conf;
@@ -239,12 +241,12 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -279,12 +281,12 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
@@ -323,12 +325,12 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -367,12 +369,12 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -410,12 +412,12 @@ public class TestLeftOuterNLJoinExec {
 
     @Test
   public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 5e72428..e6dd0a5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -155,9 +156,9 @@ public class TestMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
-    Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 3d356ee..50d431c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -138,12 +139,12 @@ public class TestNLJoinExec {
   
   @Test
   public final void testNLCrossJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
     
-    Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -166,12 +167,12 @@ public class TestNLJoinExec {
 
   @Test
   public final void testNLInnerJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+    FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
     
-    Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8b50480..b57ef3a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -45,6 +45,7 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -181,11 +182,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanPlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -210,11 +211,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[16]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -237,11 +238,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByPlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
@@ -267,12 +268,12 @@ public class TestPhysicalPlanner {
   @Test
   public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
     // TODO - currently, this query does not use hash-based group operator.
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testHashGroupByPlanWithALLField");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[15]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -297,11 +298,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortGroupByPlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[]{frags[0]}, workDir);
+        new FileFragment[]{frags[0]}, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
@@ -355,11 +356,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     ctx.setOutputPath(new Path(workDir, "grouped1"));
 
@@ -376,7 +377,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(),
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
         ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -396,11 +397,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlanWithRCFile() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     ctx.setOutputPath(new Path(workDir, "grouped2"));
 
@@ -416,7 +417,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(),
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
         ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -436,11 +437,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testPartitionedStorePlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
@@ -468,12 +469,12 @@ public class TestPhysicalPlanner {
     FileStatus [] list = fs.listStatus(path);
     assertEquals(numPartitions, list.length);
 
-    Fragment [] fragments = new Fragment[list.length];
+    FileFragment[] fragments = new FileFragment[list.length];
     int i = 0;
     for (FileStatus status : list) {
-      fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen());
+      fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
     }
-    Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments));
+    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
     scanner.init();
 
     Tuple tuple;
@@ -494,13 +495,13 @@ public class TestPhysicalPlanner {
   @Test
   public final void testPartitionedStorePlanWithEmptyGroupingSet()
       throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
 
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[14]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -526,12 +527,12 @@ public class TestPhysicalPlanner {
     FileStatus [] list = fs.listStatus(path);
     assertEquals(numPartitions, list.length);
 
-    Fragment [] fragments = new Fragment[list.length];
+    FileFragment[] fragments = new FileFragment[list.length];
     int i = 0;
     for (FileStatus status : list) {
-      fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen());
+      fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
     }
-    Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments));
+    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
     scanner.init();
     Tuple tuple;
     i = 0;
@@ -550,11 +551,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testAggregationFunction() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[8]);
     LogicalPlan plan = planner.createPlan(context);
@@ -584,11 +585,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCountFunction() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[9]);
     LogicalPlan plan = planner.createPlan(context);
@@ -616,11 +617,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByWithNullValue() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[11]);
     LogicalPlan plan = planner.createPlan(context);
@@ -640,11 +641,11 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testUnionPlan() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] }, workDir);
+        new FileFragment[] { frags[0] }, workDir);
     Expr  context = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -668,7 +669,7 @@ public class TestPhysicalPlanner {
   public final void testEvalExpr() throws IOException, PlanningException {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { }, workDir);
+        new FileFragment[] { }, workDir);
     Expr expr = analyzer.parse(QUERIES[12]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -700,11 +701,11 @@ public class TestPhysicalPlanner {
 
   //@Test
   public final void testCreateIndex() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     Expr context = analyzer.parse(createIndexStmt[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -726,12 +727,12 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testDuplicateEliminate() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(duplicateElimination[0]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -759,12 +760,12 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testIndexedStoreExec() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(context);
@@ -848,7 +849,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortEnforcer() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
@@ -862,7 +863,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -883,7 +884,7 @@ public class TestPhysicalPlanner {
     enforcer = new Enforcer();
     enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
     ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
     phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -897,7 +898,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByEnforcer() throws IOException, PlanningException {
-    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
+    FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
     Expr context = analyzer.parse(QUERIES[7]);
@@ -910,7 +911,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceHashAggregation(groupByNode.getPID());
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -931,7 +932,7 @@ public class TestPhysicalPlanner {
     enforcer = new Enforcer();
     enforcer.enforceSortAggregation(groupByNode.getPID(), null);
     ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] {frags[0]}, workDir);
+        new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
     phyPlanner = new PhysicalPlannerImpl(conf,sm);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index c95881a..d582e2b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -19,8 +19,9 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,14 +38,15 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 // this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
 public class TestRightOuterHashJoinExec {
@@ -217,12 +219,12 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -257,12 +259,12 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -297,12 +299,12 @@ public class TestRightOuterHashJoinExec {
     @Test
   public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
     
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
         Integer.MAX_VALUE);
 
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index bf08479..5bbb4aa 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -299,9 +300,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -334,9 +335,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -368,9 +369,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -402,9 +403,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -437,10 +438,10 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -472,10 +473,10 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
-    Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
         Integer.MAX_VALUE);
-    Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
 
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 3dc3a4a..45badd5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
@@ -108,10 +109,10 @@ public class TestSortExec {
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
-        .newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+        .newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(context);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
new file mode 100644
index 0000000..684cb58
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFileFragment {
+  private Path path;
+  
+  @Before
+  public final void setUp() throws Exception {
+    path = CommonTestingUtil.getTestDir();
+  }
+
+  @Test
+  public final void testGetAndSetFields() {
+    FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+    fragment1.setDistCached();
+
+    assertEquals("table1_1", fragment1.getTableName());
+    assertEquals(new Path(path, "table0"), fragment1.getPath());
+    assertTrue(fragment1.isDistCached());
+    assertTrue(0 == fragment1.getStartKey());
+    assertTrue(500 == fragment1.getEndKey());
+  }
+
+  @Test
+  public final void testGetProtoAndRestore() {
+    FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+    fragment.setDistCached();
+
+    FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
+    assertEquals("table1_1", fragment1.getTableName());
+    assertEquals(new Path(path, "table0"), fragment1.getPath());
+    assertTrue(fragment.isDistCached());
+    assertTrue(0 == fragment1.getStartKey());
+    assertTrue(500 == fragment1.getEndKey());
+  }
+
+  @Test
+  public final void testCompareTo() {
+    final int num = 10;
+    FileFragment[] tablets = new FileFragment[num];
+    for (int i = num - 1; i >= 0; i--) {
+      tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500);
+    }
+    
+    Arrays.sort(tablets);
+
+    for(int i = 0; i < num; i++) {
+      assertEquals("tablet1_"+i, tablets[i].getTableName());
+    }
+  }
+
+  @Test
+  public final void testCompareTo2() {
+    final int num = 1860;
+    FileFragment[] tablets = new FileFragment[num];
+    for (int i = num - 1; i >= 0; i--) {
+      tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500);
+    }
+
+    SortedSet sortedSet = Sets.newTreeSet();
+    for (FileFragment frag : tablets) {
+      sortedSet.add(frag);
+    }
+    assertEquals(num, sortedSet.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
deleted file mode 100644
index 630acd1..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.SortedSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFragment {
-  private Schema schema1;
-  private Path path;
-  
-  @Before
-  public final void setUp() throws Exception {
-    schema1 = new Schema();
-    schema1.addColumn("id", Type.INT4);
-    schema1.addColumn("name", Type.TEXT);
-    path = CommonTestingUtil.getTestDir();
-  }
-
-  @Test
-  public final void testGetAndSetFields() {
-    Fragment fragment1 = new Fragment("table1_1", new Path(path, "table0"), 0, 500);
-    fragment1.setDistCached();
-
-    assertEquals("table1_1", fragment1.getName());
-    assertEquals(new Path(path, "table0"), fragment1.getPath());
-    assertTrue(fragment1.isDistCached());
-    assertTrue(0 == fragment1.getStartOffset());
-    assertTrue(500 == fragment1.getLength());
-  }
-
-  @Test
-  public final void testTabletTabletProto() {
-    Fragment fragment0 = new Fragment("table1_1", new Path(path, "table0"), 0, 500);
-    fragment0.setDistCached();
-    Fragment fragment1 = new Fragment(fragment0.getProto());
-    assertEquals("table1_1", fragment1.getName());
-    assertEquals(new Path(path, "table0"), fragment1.getPath());
-    assertTrue(fragment1.isDistCached());
-    assertTrue(0 == fragment1.getStartOffset());
-    assertTrue(500 == fragment1.getLength());
-  }
-
-  @Test
-  public final void testCompareTo() {
-    final int num = 10;
-    Fragment [] tablets = new Fragment[num];
-    for (int i = num - 1; i >= 0; i--) {
-      tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500);
-    }
-    
-    Arrays.sort(tablets);
-
-    for(int i = 0; i < num; i++) {
-      assertEquals("tablet1_"+i, tablets[i].getName());
-    }
-  }
-
-  @Test
-  public final void testCompareTo2() {
-    final int num = 1860;
-    Fragment [] tablets = new Fragment[num];
-    for (int i = num - 1; i >= 0; i--) {
-      tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500);
-    }
-
-    SortedSet sortedSet = Sets.newTreeSet();
-    for (Fragment frag : tablets) {
-      sortedSet.add(frag);
-    }
-    assertEquals(num, sortedSet.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2678a54..99d89c0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -35,6 +35,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.FileUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -106,7 +107,7 @@ public class TestRowFile {
     TableProto proto = (TableProto) FileUtil.loadProto(
         cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
     meta = new TableMeta(proto);
-    Fragment fragment = new Fragment("test.tbl", dataPath, 0, file.getLen());
+    FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
 
     int tupleCnt = 0;
     start = System.currentTimeMillis();
@@ -124,8 +125,8 @@ public class TestRowFile {
     long fileLen = file.getLen()/13;
 
     for (int i = 0; i < 13; i++) {
-      fragment = new Fragment("test.tbl", dataPath, fileStart, fileLen);
-      scanner = new RowFile.RowFileScanner(conf, meta, schema, fragment);
+      fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
+      scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
       scanner.init();
       while ((tuple=scanner.next()) != null) {
         if (!idSet.remove(tuple.get(0).asInt4())) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 3b568fc..6934872 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.dataserver.retriever.FileChunk;
@@ -127,11 +128,11 @@ public class TestRangeRetrieverHandler {
     TableDesc employee = new TableDesc("employee", schema, employeeMeta, tableDir);
     catalog.addTable(employee);
 
-    Fragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
 
     TaskAttemptContext
         ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
-        new Fragment[] {frags[0]}, testDir);
+        new FileFragment[] {frags[0]}, testDir);
     Expr expr = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -240,11 +241,11 @@ public class TestRangeRetrieverHandler {
     TableDesc employee = new TableDesc("employee", schema, meta, tablePath);
     catalog.addTable(employee);
 
-    Fragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
 
     TaskAttemptContext
         ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
-        new Fragment[] {frags[0]}, testDir);
+        new FileFragment[] {frags[0]}, testDir);
     Expr expr = analyzer.parse(SORT_QUERY[1]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index 0e695a3..efdd023 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -30,17 +30,20 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.annotation.ForSplitableStore;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
 public abstract class AbstractStorageManager {
   private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
 
@@ -82,16 +85,23 @@ public abstract class AbstractStorageManager {
       LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
   }
 
-  public Scanner getScanner(TableMeta meta, Schema schema, Path path)
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
       throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     FileStatus status = fs.getFileStatus(path);
-    Fragment fragment = new Fragment(path.getName(), path, 0, status.getLen());
+    FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
     return getScanner(meta, schema, fragment);
   }
 
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment)
-      throws IOException {
+  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
+  }
+
+  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
+  }
+
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
     return getScanner(meta, schema, fragment, schema);
   }
 
@@ -175,51 +185,51 @@ public abstract class AbstractStorageManager {
     return meta;
   }
 
-  public Fragment[] split(String tableName) throws IOException {
+  public FileFragment[] split(String tableName) throws IOException {
     Path tablePath = new Path(tableBaseDir, tableName);
     return split(tableName, tablePath, fs.getDefaultBlockSize());
   }
 
-  public Fragment[] split(String tableName, long fragmentSize) throws IOException {
+  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
     Path tablePath = new Path(tableBaseDir, tableName);
     return split(tableName, tablePath, fragmentSize);
   }
 
-  public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
+  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
     TableMeta meta = getTableMeta(tablePath);
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
 
     FileStatus[] fileLists = fs.listStatus(tablePath);
     for (FileStatus file : fileLists) {
-      tablet = new Fragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
       listTablets.add(tablet);
     }
 
-    Fragment[] tablets = new Fragment[listTablets.size()];
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
     return tablets;
   }
 
-  public Fragment[] split(Path tablePath) throws IOException {
+  public FileFragment[] split(Path tablePath) throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
     return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
   }
 
-  public Fragment[] split(String tableName, Path tablePath) throws IOException {
+  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
     return split(tableName, tablePath, fs.getDefaultBlockSize());
   }
 
-  private Fragment[] split(String tableName, Path tablePath, long size)
+  private FileFragment[] split(String tableName, Path tablePath, long size)
       throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
 
     TableMeta meta = getTableMeta(tablePath);
     long defaultBlockSize = size;
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
 
     FileStatus[] fileLists = fs.listStatus(tablePath);
     for (FileStatus file : fileLists) {
@@ -227,31 +237,31 @@ public abstract class AbstractStorageManager {
       long start = 0;
       if (remainFileSize > defaultBlockSize) {
         while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize);
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
           listTablets.add(tablet);
           start += defaultBlockSize;
           remainFileSize -= defaultBlockSize;
         }
-        listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize));
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
       } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize));
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
       }
     }
 
-    Fragment[] tablets = new Fragment[listTablets.size()];
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
     return tablets;
   }
 
-  public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
                                    Path tablePath, long size)
       throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
 
     long defaultBlockSize = size;
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
 
     FileStatus[] fileLists = fs.listStatus(tablePath);
     for (FileStatus file : fileLists) {
@@ -259,18 +269,18 @@ public abstract class AbstractStorageManager {
       long start = 0;
       if (remainFileSize > defaultBlockSize) {
         while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize);
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
           listTablets.add(tablet);
           start += defaultBlockSize;
           remainFileSize -= defaultBlockSize;
         }
-        listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize));
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
       } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize));
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
       }
     }
 
-    Fragment[] tablets = new Fragment[listTablets.size()];
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
     return tablets;
@@ -396,24 +406,10 @@ public abstract class AbstractStorageManager {
    * @return is this file isSplittable?
    */
   protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException {
-    Scanner scanner = getScanner(meta, schema, filename);
+    Scanner scanner = getFileScanner(meta, schema, filename);
     return scanner.isSplittable();
   }
 
-  protected boolean isSplittable(CatalogProtos.StoreType storeType) throws IOException {
-    Method[] methods = getScannerClass(storeType).getMethods();
-
-    for (Method method : methods) {
-      ForSplitableStore annos = method.getAnnotation(ForSplitableStore.class);
-      if (annos != null) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-
   @Deprecated
   protected long computeSplitSize(long blockSize, long minSize,
                                   long maxSize) {
@@ -444,17 +440,17 @@ public abstract class AbstractStorageManager {
    * A factory that makes the split for this class. It can be overridden
    * by sub-classes to make sub-types
    */
-  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
-    return new Fragment(fragmentId, file, start, length);
+  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+    return new FileFragment(fragmentId, file, start, length);
   }
 
-  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
                                int[] diskIds) throws IOException {
-    return new Fragment(fragmentId, file, blockLocation, diskIds);
+    return new FileFragment(fragmentId, file, blockLocation, diskIds);
   }
 
   // for Non Splittable. eg, compressed gzip TextFile
-  protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+  protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
                                   BlockLocation[] blkLocations) throws IOException {
 
     Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
@@ -485,7 +481,7 @@ public abstract class AbstractStorageManager {
       hosts[i] = entry.getKey();
       hostsBlockCount[i] = entry.getValue();
     }
-    return new Fragment(fragmentId, file, start, length, hosts, hostsBlockCount);
+    return new FileFragment(fragmentId, file, start, length, hosts, hostsBlockCount);
   }
 
   /**
@@ -536,9 +532,9 @@ public abstract class AbstractStorageManager {
    * Generate the map of host and make them into Volume Ids.
    *
    */
-  private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
     Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (Fragment frag : frags) {
+    for (FileFragment frag : frags) {
       String[] hosts = frag.getHosts();
       int[] diskIds = frag.getDiskIds();
       for (int i = 0; i < hosts.length; i++) {
@@ -561,10 +557,10 @@ public abstract class AbstractStorageManager {
    *
    * @throws IOException
    */
-  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
+  public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
     // generate splits'
 
-    List<Fragment> splits = new ArrayList<Fragment>();
+    List<FileFragment> splits = new ArrayList<FileFragment>();
     List<FileStatus> files = listStatus(inputPath);
     FileSystem fs = inputPath.getFileSystem(conf);
     for (FileStatus file : files) {
@@ -613,22 +609,22 @@ public abstract class AbstractStorageManager {
 
   private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
       Configuration.class,
-      TableMeta.class,
       Schema.class,
-      Fragment.class
+      TableMeta.class,
+      FileFragment.class
   };
 
   private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
       Configuration.class,
-      TableMeta.class,
       Schema.class,
+      TableMeta.class,
       Path.class
   };
 
   /**
    * create a scanner instance.
    */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
                                          Fragment fragment) {
     T result;
     try {
@@ -638,7 +634,7 @@ public abstract class AbstractStorageManager {
         meth.setAccessible(true);
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
-      result = meth.newInstance(new Object[]{conf, meta, schema, fragment});
+      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -659,7 +655,7 @@ public abstract class AbstractStorageManager {
         meth.setAccessible(true);
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
-      result = meth.newInstance(new Object[]{conf, meta, schema, path});
+      result = meth.newInstance(new Object[]{conf, schema, meta, path});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 5ac989d..6760367 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -39,9 +39,9 @@ import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.annotation.ForSplitableStore;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.Bytes;
 
 import java.io.*;
@@ -75,8 +75,8 @@ public class CSVFile {
     private byte[] nullChars;
     private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 
-    public CSVAppender(Configuration conf, final TableMeta meta, final Schema schema, final Path path) throws IOException {
-      super(conf, meta, schema, path);
+    public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
+      super(conf, schema, meta, path);
       this.fs = path.getFileSystem(conf);
       this.meta = meta;
       this.schema = schema;
@@ -315,11 +315,10 @@ public class CSVFile {
     }
   }
 
-  @ForSplitableStore
   public static class CSVScanner extends FileScanner implements SeekableScanner {
-    public CSVScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment)
+    public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
         throws IOException {
-      super(conf, meta, schema, fragment);
+      super(conf, schema, meta, fragment);
       factory = new CompressionCodecFactory(conf);
       codec = factory.getCodec(fragment.getPath());
       if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
@@ -365,11 +364,11 @@ public class CSVFile {
     @Override
     public void init() throws IOException {
 
-      // Fragment information
+      // FileFragment information
       fs = fragment.getPath().getFileSystem(conf);
       fis = fs.open(fragment.getPath());
-      startOffset = fragment.getStartOffset();
-      length = fragment.getLength();
+      startOffset = fragment.getStartKey();
+      length = fragment.getEndKey();
 
       if(startOffset > 0) startOffset--; // prev line feed
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
index 55a9bf3..064841f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -35,7 +35,7 @@ public abstract class FileAppender implements Appender {
 
   protected boolean enabledStats;
   
-  public FileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) {
+  public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
     this.conf = conf;
     this.meta = meta;
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 9c22b4f..a9cfe1a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.fragment.FileFragment;
 
 import java.io.IOException;
 
@@ -30,12 +31,12 @@ public abstract class FileScanner implements Scanner {
   protected final Configuration conf;
   protected final TableMeta meta;
   protected final Schema schema;
-  protected final Fragment fragment;
+  protected final FileFragment fragment;
   protected final int columnNum;
 
   protected Column [] targets;
   
-  public FileScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) {
+  public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
     this.conf = conf;
     this.meta = meta;
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
deleted file mode 100644
index f40c9a7..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Fragment implements Comparable<Fragment> {
-  protected FragmentProto.Builder builder = null;
-
-  @Expose private String tableName; // required
-  @Expose private Path uri; // required
-  @Expose private Long startOffset; // required
-  @Expose private Long length; // required
-  @Expose private boolean distCached = false; // optional
-
-  private String[] hosts; // Datanode hostnames
-  @Expose private int[] hostsBlockCount; // list of block count of hosts
-  @Expose private int[] diskIds;
-
-  public Fragment() {
-    builder = FragmentProto.newBuilder();
-  }
-
-  public Fragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds)
-      throws IOException {
-    this();
-    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(),
-        blockLocation.getHosts(), diskIds);
-  }
-
-  // Non splittable
-  public Fragment(String tableName, Path uri, long start, long length, String[] hosts,
-                  int[] hostsBlockCount) {
-    this();
-    this.set(tableName, uri, start, length, null, null);
-    this.hosts = hosts;
-    this.hostsBlockCount = hostsBlockCount;
-  }
-
-  public Fragment(String fragmentId, Path path, long start, long length) {
-    this();
-    this.set(fragmentId, path, start, length, null, null);
-  }
-
-  public Fragment(FragmentProto proto) {
-    this();
-    int[] diskIds = new int[proto.getDiskIdsList().size()];
-    int i = 0;
-    for(Integer eachValue: proto.getDiskIdsList()) {
-      diskIds[i++] = eachValue;
-    }
-    this.set(proto.getId(), new Path(proto.getPath()),
-        proto.getStartOffset(), proto.getLength(),
-        proto.getHostsList().toArray(new String[]{}),
-        diskIds);
-    if (proto.hasDistCached() && proto.getDistCached()) {
-      distCached = true;
-    }
-  }
-
-  private void set(String tableName, Path path, long start,
-      long length, String[] hosts, int[] diskIds) {
-    this.tableName = tableName;
-    this.uri = path;
-    this.startOffset = start;
-    this.length = length;
-    this.hosts = hosts;
-    this.diskIds = diskIds;
-  }
-
-
-  /**
-   * Get the list of hosts (hostname) hosting this block
-   */
-  public String[] getHosts() {
-    if (hosts == null) {
-      this.hosts = new String[0];
-    }
-    return hosts;
-  }
-
-  /**
-   * Get the list of hosts block count
-   * if a fragment given multiple block, it returned 'host0:3, host1:1 ...'
-   */
-  public int[] getHostsBlockCount() {
-    if (hostsBlockCount == null) {
-      this.hostsBlockCount = new int[getHosts().length];
-      Arrays.fill(this.hostsBlockCount, 1);
-    }
-    return hostsBlockCount;
-  }
-
-  /**
-   * Get the list of Disk Ids
-   * Unknown disk is -1. Others 0 ~ N
-   */
-  public int[] getDiskIds() {
-    if (diskIds == null) {
-      this.diskIds = new int[getHosts().length];
-      Arrays.fill(this.diskIds, -1);
-    }
-    return diskIds;
-  }
-
-  public String getName() {
-    return this.tableName;
-  }
-
-  public void setName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public Path getPath() {
-    return this.uri;
-  }
-
-  public void setPath(Path path) {
-    this.uri = path;
-  }
-
-  public Long getStartOffset() {
-    return this.startOffset;
-  }
-
-  public Long getLength() {
-    return this.length;
-  }
-
-  public Boolean isDistCached() {
-    return this.distCached;
-  }
-
-  public void setDistCached() {
-    this.distCached = true;
-  }
-
-  /**
-   * 
-   * The offset range of tablets <b>MUST NOT</b> be overlapped.
-   * 
-   * @param t
-   * @return If the table paths are not same, return -1.
-   */
-  @Override
-  public int compareTo(Fragment t) {
-    if (getPath().equals(t.getPath())) {
-      long diff = this.getStartOffset() - t.getStartOffset();
-      if (diff < 0) {
-        return -1;
-      } else if (diff > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-    } else {
-      return -1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof Fragment) {
-      Fragment t = (Fragment) o;
-      if (getPath().equals(t.getPath())
-          && TUtil.checkEquals(t.getStartOffset(), this.getStartOffset())
-          && TUtil.checkEquals(t.getLength(), this.getLength())
-          && TUtil.checkEquals(t.isDistCached(), this.isDistCached())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(tableName, uri, startOffset, length, isDistCached());
-  }
-  
-  public Object clone() throws CloneNotSupportedException {
-    Fragment frag = (Fragment) super.clone();
-    frag.builder = FragmentProto.newBuilder();
-    frag.tableName = tableName;
-    frag.uri = uri;
-    frag.distCached = distCached;
-    frag.diskIds = diskIds;
-    frag.hosts = hosts;
-    frag.hostsBlockCount = hostsBlockCount;
-    
-    return frag;
-  }
-
-  @Override
-  public String toString() {
-    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
-    		+getPath() + "\", \"start\": " + this.getStartOffset() + ",\"length\": "
-        + getLength() + ", \"distCached\": " + distCached + "}" ;
-  }
-
-  public FragmentProto getProto() {
-    if (builder == null) {
-      builder = FragmentProto.newBuilder();
-    }
-    builder.setId(this.tableName);
-    builder.setStartOffset(this.startOffset);
-    builder.setLength(this.length);
-    builder.setPath(this.uri.toString());
-    builder.setDistCached(this.distCached);
-    if(diskIds != null) {
-      List<Integer> idList = new ArrayList<Integer>();
-      for(int eachId: diskIds) {
-        idList.add(eachId);
-      }
-      builder.addAllDiskIds(idList);
-    }
-
-    if(hosts != null) {
-      builder.addAllHosts(TUtil.newList(hosts));
-    }
-
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index b392e4e..2caa737 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -18,14 +18,18 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -34,17 +38,22 @@ public class MergeScanner implements Scanner {
   private Configuration conf;
   private TableMeta meta;
   private Schema schema;
-  private List<Fragment> fragments;
-  private Iterator<Fragment> iterator;
-  private Fragment currentFragment;
+  private List<FileFragment> fragments;
+  private Iterator<FileFragment> iterator;
+  private FileFragment currentFragment;
   private Scanner currentScanner;
   private Tuple tuple;
 
-  public MergeScanner(Configuration conf, TableMeta meta, Schema schema, Collection<Fragment> fragments) {
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
+      throws IOException {
     this.conf = conf;
-    this.meta = meta;
     this.schema = schema;
-    this.fragments = new ArrayList<Fragment>(fragments);
+    this.meta = meta;
+    this.fragments = Lists.newArrayList();
+    for (Fragment f : rawFragmentList) {
+      fragments.add((FileFragment) f);
+    }
+
     iterator = this.fragments.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 0eb39c1..a484643 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.BitArray;
 
 import java.io.File;
@@ -58,15 +59,15 @@ public class RawFile {
     private boolean eof = false;
     private long fileSize;
 
-    public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException {
-      super(conf, meta, schema, null);
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+      super(conf, schema, meta, null);
       this.path = path;
       init();
     }
 
     @SuppressWarnings("unused")
-    public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Fragment fragment) throws IOException {
-      this(conf, meta, schema, fragment.getPath());
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+      this(conf, schema, meta, fragment.getPath());
     }
 
     public void init() throws IOException {
@@ -309,8 +310,8 @@ public class RawFile {
 
     private TableStatistics stats;
 
-    public RawFileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException {
-      super(conf, meta, schema, path);
+    public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+      super(conf, schema, meta, path);
     }
 
     public void init() throws IOException {