You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/29 11:25:50 UTC
[2/3] TAJO-287: Improve Fragment to be more generic. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 025af87..ce31851 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -19,8 +19,9 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,15 +38,16 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestLeftOuterNLJoinExec {
private TajoConf conf;
@@ -239,12 +241,12 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -279,12 +281,12 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
@@ -323,12 +325,12 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -367,12 +369,12 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -410,12 +412,12 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 5e72428..e6dd0a5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -155,9 +156,9 @@ public class TestMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
- Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 3d356ee..50d431c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -138,12 +139,12 @@ public class TestNLJoinExec {
@Test
public final void testNLCrossJoin() throws IOException, PlanningException {
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -166,12 +167,12 @@ public class TestNLJoinExec {
@Test
public final void testNLInnerJoin() throws IOException, PlanningException {
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment [] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8b50480..b57ef3a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -45,6 +45,7 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -181,11 +182,11 @@ public class TestPhysicalPlanner {
@Test
public final void testCreateScanPlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
employee.getPath(), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -210,11 +211,11 @@ public class TestPhysicalPlanner {
@Test
public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
employee.getPath(), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
Expr expr = analyzer.parse(QUERIES[16]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode =plan.getRootBlock().getRoot();
@@ -237,11 +238,11 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByPlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
@@ -267,12 +268,12 @@ public class TestPhysicalPlanner {
@Test
public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
// TODO - currently, this query does not use hash-based group operator.
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(
"target/test-data/testHashGroupByPlanWithALLField");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[15]);
LogicalPlan plan = planner.createPlan(expr);
@@ -297,11 +298,11 @@ public class TestPhysicalPlanner {
@Test
public final void testSortGroupByPlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[]{frags[0]}, workDir);
+ new FileFragment[]{frags[0]}, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
@@ -355,11 +356,11 @@ public class TestPhysicalPlanner {
@Test
public final void testStorePlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
ctx.setOutputPath(new Path(workDir, "grouped1"));
@@ -376,7 +377,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(),
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
ctx.getOutputPath());
scanner.init();
Tuple tuple;
@@ -396,11 +397,11 @@ public class TestPhysicalPlanner {
@Test
public final void testStorePlanWithRCFile() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
ctx.setOutputPath(new Path(workDir, "grouped2"));
@@ -416,7 +417,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(),
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
ctx.getOutputPath());
scanner.init();
Tuple tuple;
@@ -436,11 +437,11 @@ public class TestPhysicalPlanner {
@Test
public final void testPartitionedStorePlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
- TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
@@ -468,12 +469,12 @@ public class TestPhysicalPlanner {
FileStatus [] list = fs.listStatus(path);
assertEquals(numPartitions, list.length);
- Fragment [] fragments = new Fragment[list.length];
+ FileFragment[] fragments = new FileFragment[list.length];
int i = 0;
for (FileStatus status : list) {
- fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen());
+ fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
}
- Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments));
+ Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
scanner.init();
Tuple tuple;
@@ -494,13 +495,13 @@ public class TestPhysicalPlanner {
@Test
public final void testPartitionedStorePlanWithEmptyGroupingSet()
throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
Path workDir = CommonTestingUtil.getTestDir(
"target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
- TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[14]);
LogicalPlan plan = planner.createPlan(expr);
@@ -526,12 +527,12 @@ public class TestPhysicalPlanner {
FileStatus [] list = fs.listStatus(path);
assertEquals(numPartitions, list.length);
- Fragment [] fragments = new Fragment[list.length];
+ FileFragment[] fragments = new FileFragment[list.length];
int i = 0;
for (FileStatus status : list) {
- fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen());
+ fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen());
}
- Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments));
+ Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
scanner.init();
Tuple tuple;
i = 0;
@@ -550,11 +551,11 @@ public class TestPhysicalPlanner {
@Test
public final void testAggregationFunction() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[8]);
LogicalPlan plan = planner.createPlan(context);
@@ -584,11 +585,11 @@ public class TestPhysicalPlanner {
@Test
public final void testCountFunction() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[9]);
LogicalPlan plan = planner.createPlan(context);
@@ -616,11 +617,11 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByWithNullValue() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[11]);
LogicalPlan plan = planner.createPlan(context);
@@ -640,11 +641,11 @@ public class TestPhysicalPlanner {
@Test
public final void testUnionPlan() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { frags[0] }, workDir);
+ new FileFragment[] { frags[0] }, workDir);
Expr context = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -668,7 +669,7 @@ public class TestPhysicalPlanner {
public final void testEvalExpr() throws IOException, PlanningException {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] { }, workDir);
+ new FileFragment[] { }, workDir);
Expr expr = analyzer.parse(QUERIES[12]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -700,11 +701,11 @@ public class TestPhysicalPlanner {
//@Test
public final void testCreateIndex() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
Expr context = analyzer.parse(createIndexStmt[0]);
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -726,12 +727,12 @@ public class TestPhysicalPlanner {
@Test
public final void testDuplicateEliminate() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(duplicateElimination[0]);
LogicalPlan plan = planner.createPlan(expr);
@@ -759,12 +760,12 @@ public class TestPhysicalPlanner {
@Test
public final void testIndexedStoreExec() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
employee.getPath(), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(SORT_QUERY[0]);
LogicalPlan plan = planner.createPlan(context);
@@ -848,7 +849,7 @@ public class TestPhysicalPlanner {
@Test
public final void testSortEnforcer() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
employee.getPath(), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
@@ -862,7 +863,7 @@ public class TestPhysicalPlanner {
Enforcer enforcer = new Enforcer();
enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -883,7 +884,7 @@ public class TestPhysicalPlanner {
enforcer = new Enforcer();
enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -897,7 +898,7 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByEnforcer() throws IOException, PlanningException {
- Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
+ FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
Expr context = analyzer.parse(QUERIES[7]);
@@ -910,7 +911,7 @@ public class TestPhysicalPlanner {
Enforcer enforcer = new Enforcer();
enforcer.enforceHashAggregation(groupByNode.getPID());
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -931,7 +932,7 @@ public class TestPhysicalPlanner {
enforcer = new Enforcer();
enforcer.enforceSortAggregation(groupByNode.getPID(), null);
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
- new Fragment[] {frags[0]}, workDir);
+ new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index c95881a..d582e2b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -19,8 +19,9 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,14 +38,15 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
public class TestRightOuterHashJoinExec {
@@ -217,12 +219,12 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -257,12 +259,12 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -297,12 +299,12 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index bf08479..5bbb4aa 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -299,9 +300,9 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -334,9 +335,9 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -368,9 +369,9 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -402,9 +403,9 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -437,10 +438,10 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -472,10 +473,10 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+ FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 3dc3a4a..45badd5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -108,10 +109,10 @@ public class TestSortExec {
@Test
public final void testNext() throws IOException, PlanningException {
- Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
- .newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+ .newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(context);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
new file mode 100644
index 0000000..684cb58
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFileFragment {
+ private Path path;
+
+ @Before
+ public final void setUp() throws Exception {
+ path = CommonTestingUtil.getTestDir();
+ }
+
+ @Test
+ public final void testGetAndSetFields() {
+ FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+ fragment1.setDistCached();
+
+ assertEquals("table1_1", fragment1.getTableName());
+ assertEquals(new Path(path, "table0"), fragment1.getPath());
+ assertTrue(fragment1.isDistCached());
+ assertTrue(0 == fragment1.getStartKey());
+ assertTrue(500 == fragment1.getEndKey());
+ }
+
+ @Test
+ public final void testGetProtoAndRestore() {
+ FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
+ fragment.setDistCached();
+
+ FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
+ assertEquals("table1_1", fragment1.getTableName());
+ assertEquals(new Path(path, "table0"), fragment1.getPath());
+ assertTrue(fragment.isDistCached());
+ assertTrue(0 == fragment1.getStartKey());
+ assertTrue(500 == fragment1.getEndKey());
+ }
+
+ @Test
+ public final void testCompareTo() {
+ final int num = 10;
+ FileFragment[] tablets = new FileFragment[num];
+ for (int i = num - 1; i >= 0; i--) {
+ tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500);
+ }
+
+ Arrays.sort(tablets);
+
+ for(int i = 0; i < num; i++) {
+ assertEquals("tablet1_"+i, tablets[i].getTableName());
+ }
+ }
+
+ @Test
+ public final void testCompareTo2() {
+ final int num = 1860;
+ FileFragment[] tablets = new FileFragment[num];
+ for (int i = num - 1; i >= 0; i--) {
+ tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500);
+ }
+
+ SortedSet sortedSet = Sets.newTreeSet();
+ for (FileFragment frag : tablets) {
+ sortedSet.add(frag);
+ }
+ assertEquals(num, sortedSet.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
deleted file mode 100644
index 630acd1..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.SortedSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFragment {
- private Schema schema1;
- private Path path;
-
- @Before
- public final void setUp() throws Exception {
- schema1 = new Schema();
- schema1.addColumn("id", Type.INT4);
- schema1.addColumn("name", Type.TEXT);
- path = CommonTestingUtil.getTestDir();
- }
-
- @Test
- public final void testGetAndSetFields() {
- Fragment fragment1 = new Fragment("table1_1", new Path(path, "table0"), 0, 500);
- fragment1.setDistCached();
-
- assertEquals("table1_1", fragment1.getName());
- assertEquals(new Path(path, "table0"), fragment1.getPath());
- assertTrue(fragment1.isDistCached());
- assertTrue(0 == fragment1.getStartOffset());
- assertTrue(500 == fragment1.getLength());
- }
-
- @Test
- public final void testTabletTabletProto() {
- Fragment fragment0 = new Fragment("table1_1", new Path(path, "table0"), 0, 500);
- fragment0.setDistCached();
- Fragment fragment1 = new Fragment(fragment0.getProto());
- assertEquals("table1_1", fragment1.getName());
- assertEquals(new Path(path, "table0"), fragment1.getPath());
- assertTrue(fragment1.isDistCached());
- assertTrue(0 == fragment1.getStartOffset());
- assertTrue(500 == fragment1.getLength());
- }
-
- @Test
- public final void testCompareTo() {
- final int num = 10;
- Fragment [] tablets = new Fragment[num];
- for (int i = num - 1; i >= 0; i--) {
- tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500);
- }
-
- Arrays.sort(tablets);
-
- for(int i = 0; i < num; i++) {
- assertEquals("tablet1_"+i, tablets[i].getName());
- }
- }
-
- @Test
- public final void testCompareTo2() {
- final int num = 1860;
- Fragment [] tablets = new Fragment[num];
- for (int i = num - 1; i >= 0; i--) {
- tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500);
- }
-
- SortedSet sortedSet = Sets.newTreeSet();
- for (Fragment frag : tablets) {
- sortedSet.add(frag);
- }
- assertEquals(num, sortedSet.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2678a54..99d89c0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -35,6 +35,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.FileUtil;
import org.junit.After;
import org.junit.Before;
@@ -106,7 +107,7 @@ public class TestRowFile {
TableProto proto = (TableProto) FileUtil.loadProto(
cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
meta = new TableMeta(proto);
- Fragment fragment = new Fragment("test.tbl", dataPath, 0, file.getLen());
+ FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
int tupleCnt = 0;
start = System.currentTimeMillis();
@@ -124,8 +125,8 @@ public class TestRowFile {
long fileLen = file.getLen()/13;
for (int i = 0; i < 13; i++) {
- fragment = new Fragment("test.tbl", dataPath, fileStart, fileLen);
- scanner = new RowFile.RowFileScanner(conf, meta, schema, fragment);
+ fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
+ scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
scanner.init();
while ((tuple=scanner.next()) != null) {
if (!idSet.remove(tuple.get(0).asInt4())) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 3b568fc..6934872 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.dataserver.retriever.FileChunk;
@@ -127,11 +128,11 @@ public class TestRangeRetrieverHandler {
TableDesc employee = new TableDesc("employee", schema, employeeMeta, tableDir);
catalog.addTable(employee);
- Fragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE);
TaskAttemptContext
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
- new Fragment[] {frags[0]}, testDir);
+ new FileFragment[] {frags[0]}, testDir);
Expr expr = analyzer.parse(SORT_QUERY[0]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -240,11 +241,11 @@ public class TestRangeRetrieverHandler {
TableDesc employee = new TableDesc("employee", schema, meta, tablePath);
catalog.addTable(employee);
- Fragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
TaskAttemptContext
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
- new Fragment[] {frags[0]}, testDir);
+ new FileFragment[] {frags[0]}, testDir);
Expr expr = analyzer.parse(SORT_QUERY[1]);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index 0e695a3..efdd023 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -30,17 +30,20 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.annotation.ForSplitableStore;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.FileUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
public abstract class AbstractStorageManager {
private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
@@ -82,16 +85,23 @@ public abstract class AbstractStorageManager {
LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
}
- public Scanner getScanner(TableMeta meta, Schema schema, Path path)
+ public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
FileStatus status = fs.getFileStatus(path);
- Fragment fragment = new Fragment(path.getName(), path, 0, status.getLen());
+ FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
return getScanner(meta, schema, fragment);
}
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment)
- throws IOException {
+ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
+ return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
+ }
+
+ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+ return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
+ }
+
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
return getScanner(meta, schema, fragment, schema);
}
@@ -175,51 +185,51 @@ public abstract class AbstractStorageManager {
return meta;
}
- public Fragment[] split(String tableName) throws IOException {
+ public FileFragment[] split(String tableName) throws IOException {
Path tablePath = new Path(tableBaseDir, tableName);
return split(tableName, tablePath, fs.getDefaultBlockSize());
}
- public Fragment[] split(String tableName, long fragmentSize) throws IOException {
+ public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
Path tablePath = new Path(tableBaseDir, tableName);
return split(tableName, tablePath, fragmentSize);
}
- public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
+ public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(conf);
TableMeta meta = getTableMeta(tablePath);
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
FileStatus[] fileLists = fs.listStatus(tablePath);
for (FileStatus file : fileLists) {
- tablet = new Fragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+ tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
listTablets.add(tablet);
}
- Fragment[] tablets = new Fragment[listTablets.size()];
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
return tablets;
}
- public Fragment[] split(Path tablePath) throws IOException {
+ public FileFragment[] split(Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(conf);
return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
}
- public Fragment[] split(String tableName, Path tablePath) throws IOException {
+ public FileFragment[] split(String tableName, Path tablePath) throws IOException {
return split(tableName, tablePath, fs.getDefaultBlockSize());
}
- private Fragment[] split(String tableName, Path tablePath, long size)
+ private FileFragment[] split(String tableName, Path tablePath, long size)
throws IOException {
FileSystem fs = tablePath.getFileSystem(conf);
TableMeta meta = getTableMeta(tablePath);
long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
FileStatus[] fileLists = fs.listStatus(tablePath);
for (FileStatus file : fileLists) {
@@ -227,31 +237,31 @@ public abstract class AbstractStorageManager {
long start = 0;
if (remainFileSize > defaultBlockSize) {
while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize);
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
listTablets.add(tablet);
start += defaultBlockSize;
remainFileSize -= defaultBlockSize;
}
- listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize));
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
} else {
- listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize));
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
}
}
- Fragment[] tablets = new Fragment[listTablets.size()];
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
return tablets;
}
- public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+ public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
Path tablePath, long size)
throws IOException {
FileSystem fs = tablePath.getFileSystem(conf);
long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
FileStatus[] fileLists = fs.listStatus(tablePath);
for (FileStatus file : fileLists) {
@@ -259,18 +269,18 @@ public abstract class AbstractStorageManager {
long start = 0;
if (remainFileSize > defaultBlockSize) {
while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize);
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
listTablets.add(tablet);
start += defaultBlockSize;
remainFileSize -= defaultBlockSize;
}
- listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize));
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
} else {
- listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize));
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
}
}
- Fragment[] tablets = new Fragment[listTablets.size()];
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
return tablets;
@@ -396,24 +406,10 @@ public abstract class AbstractStorageManager {
* @return is this file isSplittable?
*/
protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException {
- Scanner scanner = getScanner(meta, schema, filename);
+ Scanner scanner = getFileScanner(meta, schema, filename);
return scanner.isSplittable();
}
- protected boolean isSplittable(CatalogProtos.StoreType storeType) throws IOException {
- Method[] methods = getScannerClass(storeType).getMethods();
-
- for (Method method : methods) {
- ForSplitableStore annos = method.getAnnotation(ForSplitableStore.class);
- if (annos != null) {
- return true;
- }
- }
-
- return false;
- }
-
-
@Deprecated
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
@@ -444,17 +440,17 @@ public abstract class AbstractStorageManager {
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
- return new Fragment(fragmentId, file, start, length);
+ protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+ return new FileFragment(fragmentId, file, start, length);
}
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+ protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
int[] diskIds) throws IOException {
- return new Fragment(fragmentId, file, blockLocation, diskIds);
+ return new FileFragment(fragmentId, file, blockLocation, diskIds);
}
// for Non Splittable. eg, compressed gzip TextFile
- protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+ protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
BlockLocation[] blkLocations) throws IOException {
Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
@@ -485,7 +481,7 @@ public abstract class AbstractStorageManager {
hosts[i] = entry.getKey();
hostsBlockCount[i] = entry.getValue();
}
- return new Fragment(fragmentId, file, start, length, hosts, hostsBlockCount);
+ return new FileFragment(fragmentId, file, start, length, hosts, hostsBlockCount);
}
/**
@@ -536,9 +532,9 @@ public abstract class AbstractStorageManager {
* Generate the map of host and make them into Volume Ids.
*
*/
- private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+ private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (Fragment frag : frags) {
+ for (FileFragment frag : frags) {
String[] hosts = frag.getHosts();
int[] diskIds = frag.getDiskIds();
for (int i = 0; i < hosts.length; i++) {
@@ -561,10 +557,10 @@ public abstract class AbstractStorageManager {
*
* @throws IOException
*/
- public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
+ public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
// generate splits'
- List<Fragment> splits = new ArrayList<Fragment>();
+ List<FileFragment> splits = new ArrayList<FileFragment>();
List<FileStatus> files = listStatus(inputPath);
FileSystem fs = inputPath.getFileSystem(conf);
for (FileStatus file : files) {
@@ -613,22 +609,22 @@ public abstract class AbstractStorageManager {
private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
Configuration.class,
- TableMeta.class,
Schema.class,
- Fragment.class
+ TableMeta.class,
+ FileFragment.class
};
private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
Configuration.class,
- TableMeta.class,
Schema.class,
+ TableMeta.class,
Path.class
};
/**
* create a scanner instance.
*/
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
Fragment fragment) {
T result;
try {
@@ -638,7 +634,7 @@ public abstract class AbstractStorageManager {
meth.setAccessible(true);
CONSTRUCTOR_CACHE.put(theClass, meth);
}
- result = meth.newInstance(new Object[]{conf, meta, schema, fragment});
+ result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -659,7 +655,7 @@ public abstract class AbstractStorageManager {
meth.setAccessible(true);
CONSTRUCTOR_CACHE.put(theClass, meth);
}
- result = meth.newInstance(new Object[]{conf, meta, schema, path});
+ result = meth.newInstance(new Object[]{conf, schema, meta, path});
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 5ac989d..6760367 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -39,9 +39,9 @@ import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.annotation.ForSplitableStore;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.Bytes;
import java.io.*;
@@ -75,8 +75,8 @@ public class CSVFile {
private byte[] nullChars;
private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
- public CSVAppender(Configuration conf, final TableMeta meta, final Schema schema, final Path path) throws IOException {
- super(conf, meta, schema, path);
+ public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
+ super(conf, schema, meta, path);
this.fs = path.getFileSystem(conf);
this.meta = meta;
this.schema = schema;
@@ -315,11 +315,10 @@ public class CSVFile {
}
}
- @ForSplitableStore
public static class CSVScanner extends FileScanner implements SeekableScanner {
- public CSVScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment)
+ public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
throws IOException {
- super(conf, meta, schema, fragment);
+ super(conf, schema, meta, fragment);
factory = new CompressionCodecFactory(conf);
codec = factory.getCodec(fragment.getPath());
if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
@@ -365,11 +364,11 @@ public class CSVFile {
@Override
public void init() throws IOException {
- // Fragment information
+ // FileFragment information
fs = fragment.getPath().getFileSystem(conf);
fis = fs.open(fragment.getPath());
- startOffset = fragment.getStartOffset();
- length = fragment.getLength();
+ startOffset = fragment.getStartKey();
+ length = fragment.getEndKey();
if(startOffset > 0) startOffset--; // prev line feed
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
index 55a9bf3..064841f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -35,7 +35,7 @@ public abstract class FileAppender implements Appender {
protected boolean enabledStats;
- public FileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) {
+ public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
this.conf = conf;
this.meta = meta;
this.schema = schema;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 9c22b4f..a9cfe1a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.fragment.FileFragment;
import java.io.IOException;
@@ -30,12 +31,12 @@ public abstract class FileScanner implements Scanner {
protected final Configuration conf;
protected final TableMeta meta;
protected final Schema schema;
- protected final Fragment fragment;
+ protected final FileFragment fragment;
protected final int columnNum;
protected Column [] targets;
- public FileScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) {
+ public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
this.conf = conf;
this.meta = meta;
this.schema = schema;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
deleted file mode 100644
index f40c9a7..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Fragment implements Comparable<Fragment> {
- protected FragmentProto.Builder builder = null;
-
- @Expose private String tableName; // required
- @Expose private Path uri; // required
- @Expose private Long startOffset; // required
- @Expose private Long length; // required
- @Expose private boolean distCached = false; // optional
-
- private String[] hosts; // Datanode hostnames
- @Expose private int[] hostsBlockCount; // list of block count of hosts
- @Expose private int[] diskIds;
-
- public Fragment() {
- builder = FragmentProto.newBuilder();
- }
-
- public Fragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds)
- throws IOException {
- this();
- this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(),
- blockLocation.getHosts(), diskIds);
- }
-
- // Non splittable
- public Fragment(String tableName, Path uri, long start, long length, String[] hosts,
- int[] hostsBlockCount) {
- this();
- this.set(tableName, uri, start, length, null, null);
- this.hosts = hosts;
- this.hostsBlockCount = hostsBlockCount;
- }
-
- public Fragment(String fragmentId, Path path, long start, long length) {
- this();
- this.set(fragmentId, path, start, length, null, null);
- }
-
- public Fragment(FragmentProto proto) {
- this();
- int[] diskIds = new int[proto.getDiskIdsList().size()];
- int i = 0;
- for(Integer eachValue: proto.getDiskIdsList()) {
- diskIds[i++] = eachValue;
- }
- this.set(proto.getId(), new Path(proto.getPath()),
- proto.getStartOffset(), proto.getLength(),
- proto.getHostsList().toArray(new String[]{}),
- diskIds);
- if (proto.hasDistCached() && proto.getDistCached()) {
- distCached = true;
- }
- }
-
- private void set(String tableName, Path path, long start,
- long length, String[] hosts, int[] diskIds) {
- this.tableName = tableName;
- this.uri = path;
- this.startOffset = start;
- this.length = length;
- this.hosts = hosts;
- this.diskIds = diskIds;
- }
-
-
- /**
- * Get the list of hosts (hostname) hosting this block
- */
- public String[] getHosts() {
- if (hosts == null) {
- this.hosts = new String[0];
- }
- return hosts;
- }
-
- /**
- * Get the list of hosts block count
- * if a fragment given multiple block, it returned 'host0:3, host1:1 ...'
- */
- public int[] getHostsBlockCount() {
- if (hostsBlockCount == null) {
- this.hostsBlockCount = new int[getHosts().length];
- Arrays.fill(this.hostsBlockCount, 1);
- }
- return hostsBlockCount;
- }
-
- /**
- * Get the list of Disk Ids
- * Unknown disk is -1. Others 0 ~ N
- */
- public int[] getDiskIds() {
- if (diskIds == null) {
- this.diskIds = new int[getHosts().length];
- Arrays.fill(this.diskIds, -1);
- }
- return diskIds;
- }
-
- public String getName() {
- return this.tableName;
- }
-
- public void setName(String tableName) {
- this.tableName = tableName;
- }
-
- public Path getPath() {
- return this.uri;
- }
-
- public void setPath(Path path) {
- this.uri = path;
- }
-
- public Long getStartOffset() {
- return this.startOffset;
- }
-
- public Long getLength() {
- return this.length;
- }
-
- public Boolean isDistCached() {
- return this.distCached;
- }
-
- public void setDistCached() {
- this.distCached = true;
- }
-
- /**
- *
- * The offset range of tablets <b>MUST NOT</b> be overlapped.
- *
- * @param t
- * @return If the table paths are not same, return -1.
- */
- @Override
- public int compareTo(Fragment t) {
- if (getPath().equals(t.getPath())) {
- long diff = this.getStartOffset() - t.getStartOffset();
- if (diff < 0) {
- return -1;
- } else if (diff > 0) {
- return 1;
- } else {
- return 0;
- }
- } else {
- return -1;
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof Fragment) {
- Fragment t = (Fragment) o;
- if (getPath().equals(t.getPath())
- && TUtil.checkEquals(t.getStartOffset(), this.getStartOffset())
- && TUtil.checkEquals(t.getLength(), this.getLength())
- && TUtil.checkEquals(t.isDistCached(), this.isDistCached())) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(tableName, uri, startOffset, length, isDistCached());
- }
-
- public Object clone() throws CloneNotSupportedException {
- Fragment frag = (Fragment) super.clone();
- frag.builder = FragmentProto.newBuilder();
- frag.tableName = tableName;
- frag.uri = uri;
- frag.distCached = distCached;
- frag.diskIds = diskIds;
- frag.hosts = hosts;
- frag.hostsBlockCount = hostsBlockCount;
-
- return frag;
- }
-
- @Override
- public String toString() {
- return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
- +getPath() + "\", \"start\": " + this.getStartOffset() + ",\"length\": "
- + getLength() + ", \"distCached\": " + distCached + "}" ;
- }
-
- public FragmentProto getProto() {
- if (builder == null) {
- builder = FragmentProto.newBuilder();
- }
- builder.setId(this.tableName);
- builder.setStartOffset(this.startOffset);
- builder.setLength(this.length);
- builder.setPath(this.uri.toString());
- builder.setDistCached(this.distCached);
- if(diskIds != null) {
- List<Integer> idList = new ArrayList<Integer>();
- for(int eachId: diskIds) {
- idList.add(eachId);
- }
- builder.addAllDiskIds(idList);
- }
-
- if(hosts != null) {
- builder.addAllHosts(TUtil.newList(hosts));
- }
-
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index b392e4e..2caa737 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -18,14 +18,18 @@
package org.apache.tajo.storage;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -34,17 +38,22 @@ public class MergeScanner implements Scanner {
private Configuration conf;
private TableMeta meta;
private Schema schema;
- private List<Fragment> fragments;
- private Iterator<Fragment> iterator;
- private Fragment currentFragment;
+ private List<FileFragment> fragments;
+ private Iterator<FileFragment> iterator;
+ private FileFragment currentFragment;
private Scanner currentScanner;
private Tuple tuple;
- public MergeScanner(Configuration conf, TableMeta meta, Schema schema, Collection<Fragment> fragments) {
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
+ throws IOException {
this.conf = conf;
- this.meta = meta;
this.schema = schema;
- this.fragments = new ArrayList<Fragment>(fragments);
+ this.meta = meta;
+ this.fragments = Lists.newArrayList();
+ for (Fragment f : rawFragmentList) {
+ fragments.add((FileFragment) f);
+ }
+
iterator = this.fragments.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 0eb39c1..a484643 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.BitArray;
import java.io.File;
@@ -58,15 +59,15 @@ public class RawFile {
private boolean eof = false;
private long fileSize;
- public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException {
- super(conf, meta, schema, null);
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, null);
this.path = path;
init();
}
@SuppressWarnings("unused")
- public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Fragment fragment) throws IOException {
- this(conf, meta, schema, fragment.getPath());
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ this(conf, schema, meta, fragment.getPath());
}
public void init() throws IOException {
@@ -309,8 +310,8 @@ public class RawFile {
private TableStatistics stats;
- public RawFileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException {
- super(conf, meta, schema, path);
+ public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, path);
}
public void init() throws IOException {