You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/05/19 06:38:21 UTC
[5/6] tajo git commit: TAJO-1613: Rename StorageManager to Tablespace.
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8c72d39..7f92667 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -80,7 +80,7 @@ public class TestPhysicalPlanner {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static FileStorageManager sm;
+ private static FileTablespace sm;
private static Path testDir;
private static Session session = LocalTajoTestingUtility.createDummySession();
private static QueryContext defaultContext;
@@ -98,7 +98,7 @@ public class TestPhysicalPlanner {
util.startCatalogCluster();
conf = util.getConfiguration();
testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner");
- sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+ sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
catalog = util.getMiniCatalogCluster().getCatalog();
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -180,7 +180,7 @@ public class TestPhysicalPlanner {
Schema scoreSchmea = score.getSchema();
TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet());
- Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
appender.enableStats();
appender.init();
@@ -237,7 +237,7 @@ public class TestPhysicalPlanner {
@Test
public final void testCreateScanPlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanPlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -268,7 +268,7 @@ public class TestPhysicalPlanner {
@Test
public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanWithFilterPlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -297,7 +297,7 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByPlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -328,7 +328,7 @@ public class TestPhysicalPlanner {
@Test
public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
// TODO - currently, this query does not use hash-based group operator.
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY +
"/testHashGroupByPlanWithALLField");
@@ -358,7 +358,7 @@ public class TestPhysicalPlanner {
@Test
public final void testSortGroupByPlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortGroupByPlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -421,7 +421,7 @@ public class TestPhysicalPlanner {
@Test
public final void testStorePlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -442,7 +442,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Scanner scanner = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
scanner.init();
Tuple tuple;
@@ -467,7 +467,7 @@ public class TestPhysicalPlanner {
TableStats stats = largeScore.getStats();
assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB);
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
new Path(largeScore.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithMaxOutputFileSize");
@@ -502,7 +502,7 @@ public class TestPhysicalPlanner {
// checking the file contents
long totalNum = 0;
for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
- Scanner scanner = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
+ Scanner scanner = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
CatalogUtil.newTableMeta("CSV"),
rootNode.getOutSchema(),
status.getPath());
@@ -518,7 +518,7 @@ public class TestPhysicalPlanner {
@Test
public final void testStorePlanWithRCFile() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithRCFile");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -539,7 +539,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
+ Scanner scanner = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
scanner.init();
Tuple tuple;
@@ -559,7 +559,7 @@ public class TestPhysicalPlanner {
@Test
public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -586,7 +586,7 @@ public class TestPhysicalPlanner {
Enforcer enforcer = new Enforcer();
enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION);
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -610,7 +610,7 @@ public class TestPhysicalPlanner {
Enforcer enforcer = new Enforcer();
enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION);
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -626,7 +626,7 @@ public class TestPhysicalPlanner {
@Test
public final void testPartitionedStorePlan() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] },
@@ -695,7 +695,7 @@ public class TestPhysicalPlanner {
public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException {
// Preparing working dir and input fragments
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
new Path(largeScore.getPath()), Integer.MAX_VALUE);
TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlanWithMaxFileSize");
@@ -759,7 +759,7 @@ public class TestPhysicalPlanner {
@Test
public final void testPartitionedStorePlanWithEmptyGroupingSet()
throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
@@ -826,7 +826,7 @@ public class TestPhysicalPlanner {
@Test
public final void testAggregationFunction() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testAggregationFunction");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -857,7 +857,7 @@ public class TestPhysicalPlanner {
@Test
public final void testCountFunction() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCountFunction");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -885,7 +885,7 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByWithNullValue() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByWithNullValue");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -910,7 +910,7 @@ public class TestPhysicalPlanner {
@Test
public final void testUnionPlan() throws IOException, PlanningException, CloneNotSupportedException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testUnionPlan");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -975,7 +975,7 @@ public class TestPhysicalPlanner {
//@Test
public final void testCreateIndex() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -1002,7 +1002,7 @@ public class TestPhysicalPlanner {
@Test
public final void testDuplicateEliminate() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(),
new Path(score.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testDuplicateEliminate");
@@ -1036,7 +1036,7 @@ public class TestPhysicalPlanner {
@Test
public final void testSortEnforcer() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortEnforcer");
@@ -1087,7 +1087,7 @@ public class TestPhysicalPlanner {
@Test
public final void testGroupByEnforcer() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByEnforcer");
Expr context = analyzer.parse(QUERIES[7]);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index 94ebe51..600f388 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -86,7 +86,7 @@ public class TestProgressExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
@@ -134,7 +134,7 @@ public class TestProgressExternalSortExec {
}
private void testProgress(int sortBufferBytesNum) throws Exception {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 3455cb3..79db3bf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -100,7 +100,7 @@ public class TestRightOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
@@ -130,7 +130,7 @@ public class TestRightOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
@@ -170,7 +170,7 @@ public class TestRightOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -229,9 +229,9 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
- FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+ FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
@@ -270,9 +270,9 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
- FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -311,9 +311,9 @@ public class TestRightOuterHashJoinExec {
@Test
public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
- FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index c6bf2ef..b67dc09 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
@@ -146,7 +146,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
Path dep4Path = new Path(testDir, "dep4.csv");
- Appender appender4 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender4 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(dep4Meta, dep4Schema, dep4Path);
appender4.init();
Tuple tuple4 = new VTuple(dep4Schema.size());
@@ -178,7 +178,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
@@ -218,7 +218,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -271,7 +271,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender5 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
@@ -314,9 +314,9 @@ public class TestRightOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+ FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
@@ -352,9 +352,9 @@ public class TestRightOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] job3Frags =
- FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin1");
@@ -388,9 +388,9 @@ public class TestRightOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] job3Frags =
- FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin2");
@@ -424,9 +424,9 @@ public class TestRightOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] dep4Frags =
- FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin3");
@@ -461,10 +461,10 @@ public class TestRightOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] phone3Frags =
- FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
- Integer.MAX_VALUE);
+ FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin4");
@@ -498,8 +498,8 @@ public class TestRightOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
- FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index a350831..6e0aa8e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -56,7 +56,7 @@ public class TestSortExec {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static FileStorageManager sm;
+ private static FileTablespace sm;
private static Path workDir;
private static Path tablePath;
private static TableMeta employeeMeta;
@@ -69,7 +69,7 @@ public class TestSortExec {
util = TpchTestBase.getInstance().getTestingCluster();
catalog = util.getMaster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+ sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
Schema schema = new Schema();
schema.addColumn("managerid", Type.INT4);
@@ -81,7 +81,7 @@ public class TestSortExec {
tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
sm.getFileSystem().mkdirs(tablePath.getParent());
- Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(employeeMeta, schema, tablePath);
appender.init();
Tuple tuple = new VTuple(schema.size());
@@ -110,7 +110,7 @@ public class TestSortExec {
@Test
public final void testNext() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortExec");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
LocalTajoTestingUtility
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index d2faf7e..b6a4707 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -37,7 +37,7 @@ import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.hbase.*;
@@ -205,7 +205,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(testingCluster.getHBaseUtil().getConf());
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -244,7 +244,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(testingCluster.getHBaseUtil().getConf());
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -297,7 +297,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(testingCluster.getHBaseUtil().getConf());
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -334,7 +334,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(testingCluster.getHBaseUtil().getConf());
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -469,8 +469,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
new ConstEval(new TextDatum("021")));
scanNode.setQual(evalNodeEq);
- StorageManager storageManager = TableSpaceManager.getStorageManager(conf, "HBASE");
- List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+ Tablespace tablespace = TableSpaceManager.getStorageManager(conf, "HBASE");
+ List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(1, fragments.size());
assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
@@ -483,7 +483,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
scanNode.setQual(evalNodeA);
- fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
@@ -498,7 +498,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
new ConstEval(new TextDatum("075")));
EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
scanNode.setQual(evalNodeB);
- fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
@@ -521,7 +521,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
- fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
@@ -544,7 +544,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
- fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(2, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 0e42412..a65c165 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -264,7 +264,7 @@ public class TestJoinQuery extends QueryTestCaseBase {
}
Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
fileIndex++;
- appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+ appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
.getAppender(tableMeta, schema, dataPath);
appender.init();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 70d07c3..f6fd88f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -57,7 +57,7 @@ public class TestResultSet {
private static TajoTestingCluster util;
private static TajoConf conf;
private static TableDesc desc;
- private static FileStorageManager sm;
+ private static FileTablespace sm;
private static TableMeta scoreMeta;
private static Schema scoreSchema;
private static List<ByteString> serializedData;
@@ -66,7 +66,7 @@ public class TestResultSet {
public static void setup() throws Exception {
util = TpchTestBase.getInstance().getTestingCluster();
conf = util.getConfiguration();
- sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+ sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
scoreSchema = new Schema();
scoreSchema.addColumn("deptname", Type.TEXT);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 5efdede..0a473b5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -33,7 +33,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
@@ -82,7 +82,7 @@ public class TestExecutionBlockCursor {
logicalPlanner = new LogicalPlanner(catalog);
optimizer = new LogicalOptimizer(conf);
- StorageManager sm = TableSpaceManager.getFileStorageManager(conf);
+ Tablespace sm = TableSpaceManager.getFileStorageManager(conf);
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 52b59ea..3ab2df6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -67,8 +67,8 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
- FileStorageManager sm =
- (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+ FileTablespace sm =
+ (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index d007aea..a8926a0 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
long numBytes = 0;
for (Fragment eachFileFragment: rawFragmentList) {
- long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment);
+ long fragmentLength = Tablespace.getFragmentLength((TajoConf) conf, eachFileFragment);
if (fragmentLength > 0) {
numBytes += fragmentLength;
fragments.add(eachFileFragment);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
deleted file mode 100644
index 0751035..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * StorageManager manages the functions of storing and reading data.
- * StorageManager is a abstract class.
- * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class.
- *
- */
-public abstract class StorageManager implements TableSpace {
-
- public static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- protected TajoConf conf;
- protected String storeType;
-
- public StorageManager(String storeType) {
- this.storeType = storeType;
- }
-
- /**
- * Initialize storage manager.
- * @throws java.io.IOException
- */
- protected abstract void storageInit() throws IOException;
-
- /**
- * This method is called after executing "CREATE TABLE" statement.
- * If a storage is a file based storage, a storage manager may create directory.
- *
- * @param tableDesc Table description which is created.
- * @param ifNotExists Creates the table only when the table does not exist.
- * @throws java.io.IOException
- */
- @Override
- public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
-
- /**
- * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
- * which is the option to delete all the data.
- *
- * @param tableDesc
- * @throws java.io.IOException
- */
- @Override
- public abstract void purgeTable(TableDesc tableDesc) throws IOException;
-
- /**
- * Returns the splits that will serve as input for the scan tasks. The
- * number of splits matches the number of regions in a table.
- * @param fragmentId The table name or previous ExecutionBlockId
- * @param tableDesc The table description for the target data.
- * @param scanNode The logical node for scanning.
- * @return The list of input fragments.
- * @throws java.io.IOException
- */
- @Override
- public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
- ScanNode scanNode) throws IOException;
-
- /**
- * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'.
- * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation.
- * @param tableDesc The table description for the target data.
- * @param currentPage The current page number within the entire list.
- * @param numFragments The number of fragments in the result.
- * @return The list of input fragments.
- * @throws java.io.IOException
- */
- public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
- throws IOException;
-
- /**
- * It returns the storage property.
- * @return The storage property
- */
- public abstract StorageProperty getStorageProperty();
-
- /**
- * Release storage manager resource
- */
- @Override
- public abstract void close();
-
-
- /**
- * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
- * In general Repartitioner determines the partition range using previous output statistics data.
- * In the special cases, such as HBase Repartitioner uses the result of this method.
- *
- * @param queryContext The current query context which contains query properties.
- * @param tableDesc The table description for the target data.
- * @param inputSchema The input schema
- * @param sortSpecs The sort specification that contains the sort column and sort order.
- * @return The list of sort ranges.
- * @throws java.io.IOException
- */
- public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
- Schema inputSchema, SortSpec[] sortSpecs,
- TupleRange dataRange) throws IOException;
-
- /**
- * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
- * In general Tajo creates the target table after finishing the final sub-query of CATS.
- * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
- * That kind of the storage should implements the logic related to creating table in this method.
- *
- * @param node The child node of the root node.
- * @throws java.io.IOException
- */
- public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
-
- /**
- * It is called when the query failed.
- * Each storage manager should implement to be processed when the query fails in this method.
- *
- * @param node The child node of the root node.
- * @throws java.io.IOException
- */
- public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
-
- /**
- * Returns the current storage type.
- * @return
- */
- public String getStoreType() {
- return storeType;
- }
-
- /**
- * Initialize StorageManager instance. It should be called before using.
- *
- * @param tajoConf
- * @throws java.io.IOException
- */
- public void init(TajoConf tajoConf) throws IOException {
- this.conf = tajoConf;
- storageInit();
- }
-
- /**
- * Returns the splits that will serve as input for the scan tasks. The
- * number of splits matches the number of regions in a table.
- *
- * @param fragmentId The table name or previous ExecutionBlockId
- * @param tableDesc The table description for the target data.
- * @return The list of input fragments.
- * @throws java.io.IOException
- */
- public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException {
- return getSplits(fragmentId, tableDesc, null);
- }
-
- /**
- * Returns Scanner instance.
- *
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @param target Columns which are selected.
- * @return Scanner instance
- * @throws java.io.IOException
- */
- @Override
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
- }
-
- /**
- * Returns Scanner instance.
- *
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @return Scanner instance
- * @throws java.io.IOException
- */
- @Override
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
- return getScanner(meta, schema, fragment, schema);
- }
-
- /**
- * Returns Scanner instance.
- *
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @param target The output schema
- * @return Scanner instance
- * @throws java.io.IOException
- */
- @Override
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- if (fragment.isEmpty()) {
- Scanner scanner = new NullScanner(conf, schema, meta, fragment);
- scanner.setTarget(target.toArray());
-
- return scanner;
- }
-
- Scanner scanner;
-
- Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
- scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
- scanner.setTarget(target.toArray());
-
- return scanner;
- }
-
- /**
- * Returns Appender instance.
- * @param queryContext Query property.
- * @param taskAttemptId Task id.
- * @param meta Table meta data.
- * @param schema Output schema.
- * @param workDir Working directory
- * @return Appender instance
- * @throws java.io.IOException
- */
- public Appender getAppender(OverridableConf queryContext,
- TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
- throws IOException {
- Appender appender;
-
- Class<? extends Appender> appenderClass;
-
- String handlerName = meta.getStoreType().toLowerCase();
- appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
- if (appenderClass == null) {
- appenderClass = conf.getClass(
- String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
- TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
- }
-
- if (appenderClass == null) {
- throw new IOException("Unknown Storage Type: " + meta.getStoreType());
- }
-
- appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
-
- return appender;
- }
-
- /**
- * Return the Scanner class for the StoreType that is defined in storage-default.xml.
- *
- * @param storeType store type
- * @return The Scanner class
- * @throws java.io.IOException
- */
- public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
- String handlerName = storeType.toLowerCase();
- Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
- if (scannerClass == null) {
- scannerClass = conf.getClass(
- String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
- TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
- }
-
- if (scannerClass == null) {
- throw new IOException("Unknown Storage Type: " + storeType);
- }
-
- return scannerClass;
- }
-
- /**
- * Return length of the fragment.
- * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
- *
- * @param conf Tajo system property
- * @param fragment Fragment
- * @return
- */
- public static long getFragmentLength(TajoConf conf, Fragment fragment) {
- if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
- return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
- } else {
- return fragment.getLength();
- }
- }
-
- /**
- * It is called after making logical plan. Storage manager should verify the schema for inserting.
- *
- * @param tableDesc The table description of insert target.
- * @param outSchema The output schema of select query for inserting.
- * @throws java.io.IOException
- */
- @Override
- public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
- // nothing to do
- }
-
- /**
- * Returns the list of storage specified rewrite rules.
- * This values are used by LogicalOptimizer.
- *
- * @param queryContext The query property
- * @param tableDesc The description of the target table.
- * @return The list of storage specified rewrite rules
- * @throws java.io.IOException
- */
- @Override
- public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
- throws IOException {
- return null;
- }
-
- /**
- * Finalizes result data. Tajo stores result data in the staging directory.
- * If the query fails, clean up the staging directory.
- * Otherwise the query is successful, move to the final directory from the staging directory.
- *
- * @param queryContext The query property
- * @param finalEbId The final execution block id
- * @param plan The query plan
- * @param schema The final output schema
- * @param tableDesc The description of the target table
- * @return Saved path
- * @throws java.io.IOException
- */
- @Override
- public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
- LogicalPlan plan, Schema schema,
- TableDesc tableDesc) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
deleted file mode 100644
index ef4aa9a..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * It manages each tablespace; e.g., HDFS, Local file system, and Amazon S3.
- */
-public interface TableSpace extends Closeable {
- //public void format() throws IOException;
-
- void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
-
- void purgeTable(TableDesc tableDesc) throws IOException;
-
- List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException;
-
- List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
-
-// public void renameTable() throws IOException;
-//
-// public void truncateTable() throws IOException;
-//
-// public long availableCapacity() throws IOException;
-//
-// public long totalCapacity() throws IOException;
-
- Scanner getScanner(TableMeta meta, Schema schema, CatalogProtos.FragmentProto fragment, Schema target) throws IOException;
-
- Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException;
-
- Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
-
- Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
- LogicalPlan plan, Schema schema,
- TableDesc tableDesc) throws IOException;
-
- void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
-
- List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException;
-
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
index 42a5e07..a787cdb 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -63,10 +63,10 @@ public class TableSpaceManager {
Path.class
};
/**
- * Cache of StorageManager.
+ * Cache of Tablespace.
* Key is manager key(warehouse path) + store type
*/
- private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
+ private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
/**
* Cache of constructors for each class. Pins the classes so they
* can't be garbage collected until ReflectionUtils can be collected.
@@ -86,13 +86,13 @@ public class TableSpaceManager {
}
/**
- * Close StorageManager
+ * Close Tablespace
* @throws java.io.IOException
*/
public static void shutdown() throws IOException {
synchronized(storageManagers) {
- for (StorageManager eachStorageManager: storageManagers.values()) {
- eachStorageManager.close();
+ for (Tablespace eachTablespace : storageManagers.values()) {
+ eachTablespace.close();
}
}
clearCache();
@@ -105,19 +105,19 @@ public class TableSpaceManager {
* @return
* @throws IOException
*/
- public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+ public static Tablespace getFileStorageManager(TajoConf tajoConf) throws IOException {
return getStorageManager(tajoConf, "CSV");
}
/**
- * Returns the proper StorageManager instance according to the storeType.
+ * Returns the proper Tablespace instance according to the storeType.
*
* @param tajoConf Tajo system property.
* @param storeType Storage type
* @return
* @throws IOException
*/
- public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+ public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
if (fileSystem != null) {
return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
@@ -127,7 +127,7 @@ public class TableSpaceManager {
}
/**
- * Returns the proper StorageManager instance according to the storeType
+ * Returns the proper Tablespace instance according to the storeType
*
* @param tajoConf Tajo system property.
* @param storeType Storage type
@@ -135,7 +135,7 @@ public class TableSpaceManager {
* @return
* @throws IOException
*/
- private static synchronized StorageManager getStorageManager (
+ private static synchronized Tablespace getStorageManager (
TajoConf tajoConf, String storeType, String managerKey) throws IOException {
String typeName;
@@ -147,19 +147,19 @@ public class TableSpaceManager {
synchronized (storageManagers) {
String storeKey = typeName + "_" + managerKey;
- StorageManager manager = storageManagers.get(storeKey);
+ Tablespace manager = storageManagers.get(storeKey);
if (manager == null) {
- Class<? extends StorageManager> storageManagerClass =
- tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
+ Class<? extends Tablespace> storageManagerClass =
+ tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
if (storageManagerClass == null) {
throw new IOException("Unknown Storage Type: " + typeName);
}
try {
- Constructor<? extends StorageManager> constructor =
- (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+ Constructor<? extends Tablespace> constructor =
+ (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
if (constructor == null) {
constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
constructor.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
new file mode 100644
index 0000000..0626da8
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tablespace manages the functions of storing and reading data.
+ * Tablespace is a abstract class.
+ * For supporting such as HDFS, HBASE, a specific Tablespace should be implemented by inheriting this class.
+ *
+ */
+public abstract class Tablespace {
+
+ public static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ protected TajoConf conf;
+ protected String storeType;
+
+ public Tablespace(String storeType) {
+ this.storeType = storeType;
+ }
+
+ /**
+ * Initialize storage manager.
+ * @throws java.io.IOException
+ */
+ protected abstract void storageInit() throws IOException;
+
+ /**
+ * This method is called after executing "CREATE TABLE" statement.
+ * If a storage is a file based storage, a storage manager may create directory.
+ *
+ * @param tableDesc Table description which is created.
+ * @param ifNotExists Creates the table only when the table does not exist.
+ * @throws java.io.IOException
+ */
+ public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+
+ /**
+ * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
+ * which is the option to delete all the data.
+ *
+ * @param tableDesc
+ * @throws java.io.IOException
+ */
+ public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+ /**
+ * Returns the splits that will serve as input for the scan tasks. The
+ * number of splits matches the number of regions in a table.
+ * @param fragmentId The table name or previous ExecutionBlockId
+ * @param tableDesc The table description for the target data.
+ * @param scanNode The logical node for scanning.
+ * @return The list of input fragments.
+ * @throws java.io.IOException
+ */
+ public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
+ ScanNode scanNode) throws IOException;
+
+ /**
+ * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'.
+ * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation.
+ * @param tableDesc The table description for the target data.
+ * @param currentPage The current page number within the entire list.
+ * @param numFragments The number of fragments in the result.
+ * @return The list of input fragments.
+ * @throws java.io.IOException
+ */
+ public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
+ throws IOException;
+
+ /**
+ * It returns the storage property.
+ * @return The storage property
+ */
+ public abstract StorageProperty getStorageProperty();
+
+ /**
+ * Release storage manager resource
+ */
+ public abstract void close();
+
+
+ /**
+ * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
+ * In general Repartitioner determines the partition range using previous output statistics data.
+ * In the special cases, such as HBase Repartitioner uses the result of this method.
+ *
+ * @param queryContext The current query context which contains query properties.
+ * @param tableDesc The table description for the target data.
+ * @param inputSchema The input schema
+ * @param sortSpecs The sort specification that contains the sort column and sort order.
+ * @return The list of sort ranges.
+ * @throws java.io.IOException
+ */
+ public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+ Schema inputSchema, SortSpec[] sortSpecs,
+ TupleRange dataRange) throws IOException;
+
+ /**
+ * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
+ * In general Tajo creates the target table after finishing the final sub-query of CATS.
+ * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
+ * That kind of the storage should implements the logic related to creating table in this method.
+ *
+ * @param node The child node of the root node.
+ * @throws java.io.IOException
+ */
+ public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
+
+ /**
+ * It is called when the query failed.
+ * Each storage manager should implement to be processed when the query fails in this method.
+ *
+ * @param node The child node of the root node.
+ * @throws java.io.IOException
+ */
+
+ /**
+ * Returns the current storage type.
+ * @return
+ */
+ public String getStoreType() {
+ return storeType;
+ }
+
+ /**
+ * Initialize Tablespace instance. It should be called before using.
+ *
+ * @param tajoConf
+ * @throws java.io.IOException
+ */
+ public void init(TajoConf tajoConf) throws IOException {
+ this.conf = tajoConf;
+ storageInit();
+ }
+
+ /**
+ * Returns the splits that will serve as input for the scan tasks. The
+ * number of splits matches the number of regions in a table.
+ *
+ * @param fragmentId The table name or previous ExecutionBlockId
+ * @param tableDesc The table description for the target data.
+ * @return The list of input fragments.
+ * @throws java.io.IOException
+ */
+ public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException {
+ return getSplits(fragmentId, tableDesc, null);
+ }
+
+ /**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target Columns which are selected.
+ * @return Scanner instance
+ * @throws java.io.IOException
+ */
+ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+ return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
+ }
+
+ /**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @return Scanner instance
+ * @throws java.io.IOException
+ */
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
+ return getScanner(meta, schema, fragment, schema);
+ }
+
+ /**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws java.io.IOException
+ */
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+ if (fragment.isEmpty()) {
+ Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+ scanner.setTarget(target.toArray());
+
+ return scanner;
+ }
+
+ Scanner scanner;
+
+ Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+ scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
+ scanner.setTarget(target.toArray());
+
+ return scanner;
+ }
+
+ /**
+ * Returns Appender instance.
+ * @param queryContext Query property.
+ * @param taskAttemptId Task id.
+ * @param meta Table meta data.
+ * @param schema Output schema.
+ * @param workDir Working directory
+ * @return Appender instance
+ * @throws java.io.IOException
+ */
+ public Appender getAppender(OverridableConf queryContext,
+ TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+ throws IOException {
+ Appender appender;
+
+ Class<? extends Appender> appenderClass;
+
+ String handlerName = meta.getStoreType().toLowerCase();
+ appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
+ if (appenderClass == null) {
+ appenderClass = conf.getClass(
+ String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
+ TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ }
+
+ if (appenderClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+
+ return appender;
+ }
+
+ /**
+ * Return the Scanner class for the StoreType that is defined in storage-default.xml.
+ *
+ * @param storeType store type
+ * @return The Scanner class
+ * @throws java.io.IOException
+ */
+ public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
+ String handlerName = storeType.toLowerCase();
+ Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
+ if (scannerClass == null) {
+ scannerClass = conf.getClass(
+ String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
+ TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+ }
+
+ if (scannerClass == null) {
+ throw new IOException("Unknown Storage Type: " + storeType);
+ }
+
+ return scannerClass;
+ }
+
+ /**
+ * Return length of the fragment.
+ * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+ *
+ * @param conf Tajo system property
+ * @param fragment Fragment
+ * @return
+ */
+ public static long getFragmentLength(TajoConf conf, Fragment fragment) {
+ if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+ return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+ } else {
+ return fragment.getLength();
+ }
+ }
+
+ public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
+
+ /**
+ * It is called after making logical plan. Storage manager should verify the schema for inserting.
+ *
+ * @param tableDesc The table description of insert target.
+ * @param outSchema The output schema of select query for inserting.
+ * @throws java.io.IOException
+ */
+ public abstract void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+ /**
+ * Returns the list of storage specified rewrite rules.
+ * This values are used by LogicalOptimizer.
+ *
+ * @param queryContext The query property
+ * @param tableDesc The description of the target table.
+ * @return The list of storage specified rewrite rules
+ * @throws java.io.IOException
+ */
+ public abstract List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
+ throws IOException;
+
+ /**
+ * Finalizes result data. Tajo stores result data in the staging directory.
+ * If the query fails, clean up the staging directory.
+ * Otherwise the query is successful, move to the final directory from the staging directory.
+ *
+ * @param queryContext The query property
+ * @param finalEbId The final execution block id
+ * @param plan The query plan
+ * @param schema The final output schema
+ * @param tableDesc The description of the target table
+ * @return Saved path
+ * @throws java.io.IOException
+ */
+ public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+ LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index aa078a7..93611fb 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -23,11 +23,11 @@
<!-- Storage Manager Configuration -->
<property>
<name>tajo.storage.manager.hdfs.class</name>
- <value>org.apache.tajo.storage.FileStorageManager</value>
+ <value>org.apache.tajo.storage.FileTablespace</value>
</property>
<property>
<name>tajo.storage.manager.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ <value>org.apache.tajo.storage.hbase.HBaseTablespace</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 712f664..6aa32fc 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -28,11 +28,11 @@
<!-- Storage Manager Configuration -->
<property>
<name>tajo.storage.manager.hdfs.class</name>
- <value>org.apache.tajo.storage.FileStorageManager</value>
+ <value>org.apache.tajo.storage.FileTablespace</value>
</property>
<property>
<name>tajo.storage.manager.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ <value>org.apache.tajo.storage.hbase.HBaseTablespace</value>
</property>
<!--- Registered Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 09a86b4..b1a2d59 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -46,8 +46,8 @@ public class HBasePutAppender extends AbstractHBaseAppender {
public void init() throws IOException {
super.init();
- Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
+ Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
.getConnection(hbaseConf);
htable = hconn.getTable(columnMapping.getHbaseTableName());
htable.setAutoFlushTo(false);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 24bfd4d..992c13c 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner {
rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
- hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
+ hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
initScanner();
}
@@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner {
}
if (htable == null) {
- HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(hbaseConf);
htable = hconn.getTable(fragment.getHbaseTableName());
}