You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/25 02:55:49 UTC
[2/3] tajo git commit: TAJO-1616: Implement TablespaceManager to load
Tablespaces. (missed commits)
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index ae90502..afa273b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -84,7 +84,7 @@ public class TestHashSemiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
VTuple tuple = new VTuple(employeeSchema.size());
@@ -110,7 +110,7 @@ public class TestHashSemiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
@@ -133,7 +133,7 @@ public class TestHashSemiJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index bbb441c..c93a1b4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -105,7 +105,7 @@ public class TestLeftOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -134,7 +134,7 @@ public class TestLeftOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
@@ -174,7 +174,7 @@ public class TestLeftOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -227,7 +227,7 @@ public class TestLeftOuterHashJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
@@ -239,7 +239,7 @@ public class TestLeftOuterHashJoinExec {
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index d0d0983..c4e7752 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -85,7 +85,7 @@ public class TestMergeJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
VTuple tuple = new VTuple(employeeSchema.size());
@@ -114,7 +114,7 @@ public class TestMergeJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
@@ -139,7 +139,7 @@ public class TestMergeJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 4866323..1b30ef8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -83,7 +83,7 @@ public class TestNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.init();
VTuple tuple = new VTuple(schema.size());
@@ -107,7 +107,7 @@ public class TestNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
@@ -125,7 +125,7 @@ public class TestNLJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b2a228a..dff0cbe 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -98,7 +98,7 @@ public class TestPhysicalPlanner {
util.startCatalogCluster();
conf = util.getConfiguration();
testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner");
- sm = TableSpaceManager.getLocalFs();
+ sm = TablespaceManager.getLocalFs();
catalog = util.getMiniCatalogCluster().getCatalog();
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -164,7 +164,7 @@ public class TestPhysicalPlanner {
appender.close();
catalog.createTable(score);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
@@ -180,7 +180,7 @@ public class TestPhysicalPlanner {
Schema scoreSchmea = score.getSchema();
TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet());
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
appender.enableStats();
appender.init();
@@ -442,7 +442,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs())
.getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
scanner.init();
Tuple tuple;
@@ -502,7 +502,7 @@ public class TestPhysicalPlanner {
// checking the file contents
long totalNum = 0;
for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
- Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner(
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
CatalogUtil.newTableMeta("CSV"),
rootNode.getOutSchema(),
status.getPath());
@@ -539,7 +539,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner(
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
scanner.init();
Tuple tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index 1b54948..d1da787 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -86,7 +86,7 @@ public class TestProgressExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
@@ -110,7 +110,7 @@ public class TestProgressExternalSortExec {
employeePath.toUri());
catalog.createTable(employee);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index c956f29..f581db8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -100,7 +100,7 @@ public class TestRightOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
@@ -130,7 +130,7 @@ public class TestRightOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
@@ -170,7 +170,7 @@ public class TestRightOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -212,7 +212,7 @@ public class TestRightOuterHashJoinExec {
catalog.createTable(emp3);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 25f0ca4..d86b229 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
@@ -146,7 +146,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
Path dep4Path = new Path(testDir, "dep4.csv");
- Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(dep4Meta, dep4Schema, dep4Path);
appender4.init();
VTuple tuple4 = new VTuple(dep4Schema.size());
@@ -178,7 +178,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
@@ -218,7 +218,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -271,7 +271,7 @@ public class TestRightOuterMergeJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
@@ -281,7 +281,7 @@ public class TestRightOuterMergeJoinExec {
catalog.createTable(phone3);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index ce12faf..4690e71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -69,7 +69,7 @@ public class TestSortExec {
util = TpchTestBase.getInstance().getTestingCluster();
catalog = util.getMaster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = TableSpaceManager.getLocalFs();
+ sm = TablespaceManager.getLocalFs();
Schema schema = new Schema();
schema.addColumn("managerid", Type.INT4);
@@ -81,7 +81,7 @@ public class TestSortExec {
tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
sm.getFileSystem().mkdirs(tablePath.getParent());
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, tablePath);
appender.init();
VTuple tuple = new VTuple(schema.size());
@@ -101,7 +101,7 @@ public class TestSortExec {
catalog.createTable(desc);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 3d2d857..569111c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -38,7 +38,7 @@ import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.hbase.*;
@@ -82,7 +82,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort;
HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf()));
- TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+ TablespaceManager.addTableSpaceForTest(hBaseTablespace);
}
@AfterClass
@@ -213,7 +213,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
HConnection hconn = space.getConnection();
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -253,7 +253,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
HConnection hconn = space.getConnection();
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -306,7 +306,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
HConnection hconn = space.getConnection();
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -343,7 +343,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertTableExists("external_hbase_mapped_table");
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
HConnection hconn = space.getConnection();
HTableInterface htable = hconn.getTable("external_hbase_table");
@@ -477,7 +477,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
new ConstEval(new TextDatum("021")));
scanNode.setQual(evalNodeEq);
- Tablespace tablespace = TableSpaceManager.getByName("cluster1").get();
+ Tablespace tablespace = TablespaceManager.getByName("cluster1").get();
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
assertEquals(1, fragments.size());
assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
@@ -683,6 +683,48 @@ public class TestHBaseTable extends QueryTestCaseBase {
}
@Test
+ public void testInsertValues1() throws Exception {
+ executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+ "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
+
+ assertTableExists("hbase_mapped_table");
+ TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+ executeString("insert into hbase_mapped_table select 'aaa', 'a12', 'a34', 1").close();
+ executeString("insert into hbase_mapped_table select 'bbb', 'b12', 'b34', 2").close();
+ executeString("insert into hbase_mapped_table select 'ccc', 'c12', 'c34', 3").close();
+ executeString("insert into hbase_mapped_table select 'ddd', 'd12', 'd34', 4").close();
+
+ HTable htable = null;
+ ResultScanner scanner = null;
+ try {
+ htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes("col1"));
+ scan.addFamily(Bytes.toBytes("col2"));
+ scan.addFamily(Bytes.toBytes("col3"));
+ scanner = htable.getScanner(scan);
+
+ assertStrings(resultSetToString(scanner,
+ new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+ new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+ new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+ } finally {
+ executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+ if (scanner != null) {
+ scanner.close();
+ }
+
+ if (htable != null) {
+ htable.close();
+ }
+ }
+ }
+
+ @Test
public void testInsertIntoMultiRegion() throws Exception {
executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
"USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
@@ -1301,10 +1343,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
}
}
- private String resultSetToString(ResultScanner scanner,
- byte[][] cfNames, byte[][] qualifiers,
- boolean[] binaries,
- Schema schema) throws Exception {
+ private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers,
+ boolean [] binaries, Schema schema) throws Exception {
StringBuilder sb = new StringBuilder();
Result result = null;
while ( (result = scanner.next()) != null ) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 1478690..dd67e06 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -264,7 +264,7 @@ public class TestJoinQuery extends QueryTestCaseBase {
}
Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv");
fileIndex++;
- appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
+ appender = (((FileTablespace) TablespaceManager.getLocalFs()))
.getAppender(tableMeta, schema, dataPath);
appender.init();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index c714749..265f075 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -20,10 +20,13 @@ package org.apache.tajo.ha;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.service.ServiceTracker;
@@ -32,58 +35,51 @@ import org.junit.Test;
import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertEquals;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotEquals;
public class TestHAServiceHDFSImpl {
private TajoTestingCluster cluster;
- private TajoMaster backupMaster;
- private TajoConf conf;
- private TajoClient client;
+ private TajoMaster primaryMaster;
+ private TajoMaster backupMaster;
private Path haPath, activePath, backupPath;
- private String masterAddress;
-
@Test
public final void testAutoFailOver() throws Exception {
- cluster = new TajoTestingCluster(true);
-
- cluster.startMiniCluster(1);
- conf = cluster.getConfiguration();
- client = cluster.newTajoClient();
+ cluster = TpchTestBase.getInstance().getTestingCluster();
try {
FileSystem fs = cluster.getDefaultFileSystem();
- ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf);
- masterAddress = serviceTracker.getUmbilicalAddress().getHostName();
-
- setConfiguration();
+ TajoConf primaryConf = setConfigForHAMaster();
+ primaryMaster = new TajoMaster();
+ primaryMaster.init(primaryConf);
+ primaryMaster.start();
+ TajoConf backupConf = setConfigForHAMaster();
backupMaster = new TajoMaster();
- backupMaster.init(conf);
+ backupMaster.init(backupConf);
backupMaster.start();
- assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName());
+ ServiceTracker tracker = ServiceTrackerFactory.get(primaryConf);
+ assertNotEquals(primaryMaster.getMasterName(), backupMaster.getMasterName());
verifySystemDirectories(fs);
assertEquals(2, fs.listStatus(activePath).length);
assertEquals(1, fs.listStatus(backupPath).length);
assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
- assertTrue(fs.exists(new Path(activePath, cluster.getMaster().getMasterName().replaceAll(":", "_"))));
+ assertTrue(fs.exists(new Path(activePath, primaryMaster.getMasterName().replaceAll(":", "_"))));
assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_"))));
- createDatabaseAndTable();
- verifyDataBaseAndTable();
- client.close();
+ createDatabaseAndTable(tracker);
+ verifyDataBaseAndTable(tracker);
- cluster.getMaster().stop();
+ primaryMaster.stop();
- client = cluster.newTajoClient();
- verifyDataBaseAndTable();
+ verifyDataBaseAndTable(tracker);
assertEquals(2, fs.listStatus(activePath).length);
assertEquals(0, fs.listStatus(backupPath).length);
@@ -91,25 +87,23 @@ public class TestHAServiceHDFSImpl {
assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_"))));
} finally {
- client.close();
backupMaster.stop();
- cluster.shutdownMiniCluster();
}
}
- private void setConfiguration() {
- conf = cluster.getConfiguration();
+ private TajoConf setConfigForHAMaster() {
+ TajoConf conf = new TajoConf(cluster.getConfiguration());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ "localhost:" + NetUtils.getFreeSocketPort());
conf.setIntVar(TajoConf.ConfVars.REST_SERVICE_PORT,
NetUtils.getFreeSocketPort());
@@ -126,6 +120,8 @@ public class TestHAServiceHDFSImpl {
conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+
+ return conf;
}
private void verifySystemDirectories(FileSystem fs) throws Exception {
@@ -139,14 +135,26 @@ public class TestHAServiceHDFSImpl {
assertTrue(fs.exists(backupPath));
}
- private void createDatabaseAndTable() throws Exception {
- client.executeQuery("CREATE TABLE default.table1 (age int);");
- client.executeQuery("CREATE TABLE default.table2 (age int);");
+ private void createDatabaseAndTable(ServiceTracker tracker) throws Exception {
+ TajoClient client = null;
+ try {
+ client = new TajoClientImpl(tracker);
+ client.executeQuery("CREATE TABLE default.ha_test1 (age int);");
+ client.executeQuery("CREATE TABLE default.ha_test2 (age int);");
+ } finally {
+ IOUtils.cleanup(null, client);
+ }
}
- private void verifyDataBaseAndTable() throws Exception {
- client.existDatabase("default");
- client.existTable("default.table1");
- client.existTable("default.table2");
+ private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception {
+ TajoClient client = null;
+ try {
+ client = new TajoClientImpl(tracker);
+ client.existDatabase("default");
+ client.existTable("default.ha_test1");
+ client.existTable("default.ha_test2");
+ } finally {
+ IOUtils.cleanup(null, client);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index fc25c27..3d32c08 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -66,7 +66,7 @@ public class TestResultSet {
public static void setup() throws Exception {
util = TpchTestBase.getInstance().getTestingCluster();
conf = util.getConfiguration();
- sm = TableSpaceManager.getDefault();
+ sm = TablespaceManager.getDefault();
scoreSchema = new Schema();
scoreSchema.addColumn("deptname", Type.TEXT);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 48966bc..7c61cc7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -34,7 +34,7 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -79,7 +79,7 @@ public class TestExecutionBlockCursor {
}
analyzer = new SQLAnalyzer();
- logicalPlanner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
dispatcher = new AsyncDispatcher();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
index 6322732..e8d59d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -22,18 +22,17 @@ import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import java.io.File;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.ResultSetUtil;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
@@ -62,256 +61,21 @@ import org.junit.Test;
import com.google.protobuf.ByteString;
-public class TestNonForwardQueryResultSystemScanner {
-
- private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> {
-
- private final Matcher<? extends T> matcher;
-
- public CollectionMatcher(Matcher<? extends T> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("a collection containing ").appendDescriptionOf(this.matcher);
- }
-
- @Override
- protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) {
- boolean isFirst = true;
- Iterator<? extends T> iterator = item.iterator();
-
- while (iterator.hasNext()) {
- T obj = iterator.next();
- if (this.matcher.matches(obj)) {
- return true;
- }
-
- if (!isFirst) {
- mismatchDescription.appendText(", ");
- }
-
- this.matcher.describeMismatch(obj, mismatchDescription);
- isFirst = false;
- }
- return false;
- }
-
- }
-
- private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) {
- return new CollectionMatcher<T>(matcher);
- }
-
- private static LocalTajoTestingUtility testUtil;
- private static TajoTestingCluster testingCluster;
- private static TajoConf conf;
- private static MasterContext masterContext;
-
- private static SQLAnalyzer analyzer;
- private static LogicalPlanner logicalPlanner;
- private static LogicalOptimizer logicalOptimizer;
-
- private static void setupTestingCluster() throws Exception {
- testUtil = new LocalTajoTestingUtility();
- String[] names, paths;
- Schema[] schemas;
-
- TPCH tpch = new TPCH();
- tpch.loadSchemas();
- tpch.loadQueries();
-
- names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp",
- "region", "supplier", "empty_orders"};
- schemas = new Schema[names.length];
- for (int i = 0; i < names.length; i++) {
- schemas[i] = tpch.getSchema(names[i]);
- }
-
- File file;
- paths = new String[names.length];
- for (int i = 0; i < names.length; i++) {
- file = new File("src/test/tpch/" + names[i] + ".tbl");
- if(!file.exists()) {
- file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
- + ".tbl");
- }
- paths[i] = file.getAbsolutePath();
- }
-
- KeyValueSet opt = new KeyValueSet();
- opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
- testUtil.setup(names, paths, schemas, opt);
-
- testingCluster = testUtil.getTestingCluster();
- }
-
- @BeforeClass
- public static void setUp() throws Exception {
- setupTestingCluster();
-
- conf = testingCluster.getConfiguration();
- masterContext = testingCluster.getMaster().getContext();
-
- GlobalEngine globalEngine = masterContext.getGlobalEngine();
- analyzer = globalEngine.getAnalyzer();
- logicalPlanner = globalEngine.getLogicalPlanner();
- logicalOptimizer = globalEngine.getLogicalOptimizer();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- try {
- Thread.sleep(2000);
- } catch (Exception ignored) {
- }
-
- testUtil.shutdown();
- }
-
- private NonForwardQueryResultScanner getScanner(String sql) throws Exception {
- QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- String sessionId = UUID.randomUUID().toString();
-
- return getScanner(sql, queryId, sessionId);
- }
-
- private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception {
- QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
-
- Expr expr = analyzer.parse(sql);
- LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr);
- logicalOptimizer.optimize(logicalPlan);
-
- int maxRow = Integer.MAX_VALUE;
- if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) {
- LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT);
- maxRow = (int) limitNode.getFetchFirstNum();
- }
-
- NonForwardQueryResultScanner queryResultScanner =
- new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId,
- sessionId, maxRow);
-
- return queryResultScanner;
- }
-
- @Test
- public void testInit() throws Exception {
- QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- String sessionId = UUID.randomUUID().toString();
- NonForwardQueryResultScanner queryResultScanner =
- getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE",
- queryId, sessionId);
-
- queryResultScanner.init();
-
- assertThat(queryResultScanner.getQueryId(), is(notNullValue()));
- assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue()));
- assertThat(queryResultScanner.getSessionId(), is(notNullValue()));
- assertThat(queryResultScanner.getTableDesc(), is(notNullValue()));
-
- assertThat(queryResultScanner.getQueryId(), is(queryId));
- assertThat(queryResultScanner.getSessionId(), is(sessionId));
-
- assertThat(queryResultScanner.getLogicalSchema().size(), is(2));
- assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue()));
- }
-
- private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) {
- List<Tuple> tuples = new ArrayList<Tuple>(bytes.size());
-
- for (ByteString byteString: bytes) {
- Tuple aTuple = decoder.toTuple(byteString.toByteArray());
- tuples.add(aTuple);
- }
-
- return tuples;
- }
-
- private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) {
- return new TypeSafeDiagnosingMatcher<Tuple>() {
-
- @Override
- public void describeTo(Description description) {
- description.appendDescriptionOf(matcher);
- }
-
- @Override
- protected boolean matchesSafely(Tuple item, Description mismatchDescription) {
- Object itemValue = null;
-
- Type type = item.type(fieldId);
- if (type == Type.TEXT) {
- itemValue = item.getText(fieldId);
- } else if (type == Type.INT4) {
- itemValue = item.getInt4(fieldId);
- } else if (type == Type.INT8) {
- itemValue = item.getInt8(fieldId);
- }
-
- if (itemValue != null && matcher.matches(itemValue)) {
- return true;
- }
-
- matcher.describeMismatch(itemValue, mismatchDescription);
- return false;
- }
- };
- }
-
+public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase {
@Test
public void testGetNextRowsForAggregateFunction() throws Exception {
- NonForwardQueryResultScanner queryResultScanner =
- getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES");
-
- queryResultScanner.init();
-
- List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-
- assertThat(rowBytes.size(), is(1));
-
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
- List<Tuple> tuples = getTupleList(decoder, rowBytes);
-
- assertThat(tuples.size(), is(1));
- assertThat(tuples, hasItem(getTupleMatcher(0, is(9L))));
+ assertQueryStr("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " +
+ "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
}
-
+
@Test
public void testGetNextRowsForTable() throws Exception {
- NonForwardQueryResultScanner queryResultScanner =
- getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES");
-
- queryResultScanner.init();
-
- List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-
- assertThat(rowBytes.size(), is(9));
-
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
- List<Tuple> tuples = getTupleList(decoder, rowBytes);;
-
- assertThat(tuples.size(), is(9));
- assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
+ assertQueryStr("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES " +
+ "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
}
-
+
@Test
public void testGetClusterDetails() throws Exception {
- NonForwardQueryResultScanner queryResultScanner =
- getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
-
- queryResultScanner.init();
-
- List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-
- assertThat(rowBytes.size(), is(2));
-
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
- List<Tuple> tuples = getTupleList(decoder, rowBytes);
-
- assertThat(tuples.size(), is(2));
- assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster"))));
+ assertQueryStr("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index edddc5a..1351716 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -45,7 +45,7 @@ import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
@@ -105,7 +105,7 @@ public class TestKillQuery {
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(conf);
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -169,7 +169,7 @@ public class TestKillQuery {
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(conf);
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 863c7b5..f48a71e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -67,7 +67,7 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
- FileTablespace sm = (FileTablespace) TableSpaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
+ FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
new file mode 100644
index 0000000..45d730a
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
@@ -0,0 +1,4 @@
+aaa, a12, {"": "a34"}, 1
+bbb, b12, {"": "b34"}, 2
+ccc, c12, {"": "c34"}, 3
+ddd, d12, {"": "d34"}, 4
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
new file mode 100644
index 0000000..9f12294
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
@@ -0,0 +1,4 @@
+type
+-------------------------------
+QueryMaster
+Worker
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
new file mode 100644
index 0000000..07dd98b
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
@@ -0,0 +1,3 @@
+?count_2
+-------------------------------
+3
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
new file mode 100644
index 0000000..fd37504
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
@@ -0,0 +1,5 @@
+table_name,table_type
+-------------------------------
+customer,EXTERNAL
+lineitem,EXTERNAL
+nation,EXTERNAL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
index 17f79da..22f4781 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
@@ -239,6 +239,14 @@ public class LogicalPlan {
return queryBlocks.get(ROOT_BLOCK);
}
+ public LogicalRootNode getRootNode() {
+ return queryBlocks.get(ROOT_BLOCK).getRoot();
+ }
+
+ public Schema getOutputSchema() {
+ return getRootNode().getOutSchema();
+ }
+
public QueryBlock getBlock(String blockName) {
return queryBlocks.get(blockName);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 16ca368..441e047 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -910,6 +910,22 @@ public class PlannerUtil {
}
}
+ public static TableDesc getOutputTableDesc(LogicalPlan plan) {
+ LogicalNode [] found = findAllNodes(plan.getRootNode().getChild(), NodeType.CREATE_TABLE, NodeType.INSERT);
+
+ if (found.length == 0) {
+ return new TableDesc(null, plan.getRootNode().getOutSchema(), "TEXT", new KeyValueSet(), null);
+ } else {
+ StoreTableNode storeNode = (StoreTableNode) found[0];
+ return new TableDesc(
+ storeNode.getTableName(),
+ storeNode.getOutSchema(),
+ storeNode.getStorageType(),
+ storeNode.getOptions(),
+ storeNode.getUri());
+ }
+ }
+
public static TableDesc getTableDesc(CatalogService catalog, LogicalNode node) throws IOException {
if (node.getType() == NodeType.ROOT) {
node = ((LogicalRootNode)node).getChild();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
index 0f0cd10..547a6f2 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
@@ -18,14 +18,46 @@
package org.apache.tajo.storage;
+/**
+ * Format properties
+ */
public class FormatProperty {
- private boolean sortedInsertRequired;
- public FormatProperty(boolean sortedInsertRequired) {
- this.sortedInsertRequired = sortedInsertRequired;
+ /** if this format supports insert operation */
+ private boolean insertable;
+ /** if this format supports direct insertion (e.g., HBASE or JDBC-based storages) */
+ private boolean directInsert;
+ /** if this format supports staging phase */
+ private boolean stagingSupport;
+
+ public FormatProperty(boolean insertable, boolean directInsert, boolean stagingSupport) {
+ this.insertable = insertable;
+ this.stagingSupport = stagingSupport;
+ this.directInsert = directInsert;
+ }
+
+ /**
+ * Return if this format supports staging phase
+ * @return True if this format supports staging phase
+ */
+ public boolean isInsertable() {
+ return insertable;
+ }
+
+ /**
+ * Return if this format supports direct insertion (e.g., HBASE or JDBC-based storages)
+ * @return True if this format supports direct insertion
+ */
+ public boolean directInsertSupported() {
+ return directInsert;
}
- public boolean sortedInsertRequired() {
- return sortedInsertRequired;
+ /**
+ * Return if this format supports staging phase
+ *
+ * @return True if this format supports staging phase
+ */
+ public boolean isStagingSupport() {
+ return stagingSupport;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index ce573be..67a2f86 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
long numBytes = 0;
for (Fragment eachFileFragment: rawFragmentList) {
- long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
+ long fragmentLength = TablespaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
if (fragmentLength > 0) {
numBytes += fragmentLength;
fragments.add(eachFileFragment);
@@ -131,7 +131,7 @@ public class MergeScanner implements Scanner {
private Scanner getNextScanner() throws IOException {
if (iterator.hasNext()) {
currentFragment = iterator.next();
- currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
+ currentScanner = TablespaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
currentScanner.init();
return currentScanner;
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
index 12b236f..ef33a8e 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@ -31,7 +31,6 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.FileUtil;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -158,7 +157,7 @@ public class OldStorageManager {
Constructor<? extends Tablespace> constructor =
(Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
if (constructor == null) {
- constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM);
+ constructor = storageManagerClass.getDeclaredConstructor(TablespaceManager.TABLESPACE_PARAM);
constructor.setAccessible(true);
CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
index 38d0734..c1db34e 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -18,20 +18,39 @@
package org.apache.tajo.storage;
+/**
+ * Storage Properties
+ */
public class StorageProperty {
- private boolean movable;
- private boolean writable;
- private boolean insertable;
- private boolean absolutePathAllowed;
+ /** default file format */
+ private final String defaultFormat;
+ /** if this storage supports move operator */
+ private final boolean movable;
+ /** if this storage supports is writable */
+ private final boolean writable;
+ /** if this storage allows use of artibrary paths */
+ private final boolean absolutePathAllowed;
+
+ public StorageProperty(String defaultFormat,
+ boolean movable,
+ boolean writable,
+ boolean absolutePathAllowed) {
- public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) {
+ this.defaultFormat = defaultFormat;
this.movable = movable;
this.writable = writable;
- this.insertable = isInsertable;
this.absolutePathAllowed = absolutePathAllowed;
}
/**
+ * Return default file format
+ * @return Default file format
+ */
+ public String defaultFormat() {
+ return defaultFormat;
+ }
+
+ /**
* Move-like operation is allowed
*
* @return true if move operation is available
@@ -50,18 +69,9 @@ public class StorageProperty {
}
/**
- * this storage supports insert operation?
- *
- * @return true if insert operation is allowed.
- */
- public boolean isInsertable() {
- return insertable;
- }
-
- /**
* Does this storage allows the use of arbitrary absolute paths outside tablespace?
*
- * @return
+ * @return True if this storage allows accesses to artibrary paths.
*/
public boolean isArbitraryPathAllowed() {
return this.absolutePathAllowed;
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
deleted file mode 100644
index ef04509..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import net.minidev.json.JSONObject;
-import net.minidev.json.parser.JSONParser;
-import net.minidev.json.parser.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.Pair;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
-
-/**
- * It handles available table spaces and cache TableSpace instances.
- *
- * Default tablespace must be a filesystem-based one.
- * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
- * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
- */
-public class TableSpaceManager implements StorageService {
- private static final Log LOG = LogFactory.getLog(TableSpaceManager.class);
-
- public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
- public static final String SITE_CONFIG_FILE = "storage-site.json";
-
- /** default tablespace name */
- public static final String DEFAULT_TABLESPACE_NAME = "default";
-
- private final static TajoConf systemConf = new TajoConf();
- private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
-
- // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
- protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
- protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
-
- protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
- protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
-
- public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
-
- static {
- instance = new TableSpaceManager();
- }
- /**
- * Singleton instance
- */
- private static final TableSpaceManager instance;
-
- private TableSpaceManager() {
- initForDefaultConfig(); // loading storage-default.json
- initSiteConfig(); // storage-site.json will override the configs of storage-default.json
- addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace
- addLocalFsTablespace(); // adding a tablespace using local file system by default
- }
-
- private void addWarehouseAsSpace() {
- Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
- registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
- }
-
- private void addLocalFsTablespace() {
- if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
- String tmpName = UUID.randomUUID().toString();
- registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
- }
- }
-
- public static TableSpaceManager getInstance() {
- return instance;
- }
-
- private void initForDefaultConfig() {
- JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
- if (json == null) {
- throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
- }
- applyConfig(json, false);
- }
-
- private void initSiteConfig() {
- JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
-
- // if there is no storage-site.json file, nothing happen.
- if (json != null) {
- applyConfig(json, true);
- }
- }
-
- private JSONObject loadFromConfig(String fileName) {
- String json;
- try {
- json = FileUtil.readTextFileFromResource(fileName);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- if (json != null) {
- return parseJson(json);
- } else {
- return null;
- }
- }
-
- private static JSONObject parseJson(String json) {
- try {
- return (JSONObject) parser.parse(json);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void applyConfig(JSONObject json, boolean override) {
- loadStorages(json);
- loadTableSpaces(json, override);
- }
-
- private void loadStorages(JSONObject json) {
- JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
-
- if (spaces != null) {
- Pair<String, Class<? extends Tablespace>> pair = null;
- for (Map.Entry<String, Object> entry : spaces.entrySet()) {
-
- try {
- pair = extractStorage(entry);
- } catch (ClassNotFoundException e) {
- LOG.warn(e);
- continue;
- }
-
- TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
- }
- }
- }
-
- private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
- throws ClassNotFoundException {
-
- String storageType = entry.getKey();
- JSONObject storageDesc = (JSONObject) entry.getValue();
- String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
-
- return new Pair<String, Class<? extends Tablespace>>(
- storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
- }
-
- private void loadTableSpaces(JSONObject json, boolean override) {
- JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
-
- if (spaces != null) {
- for (Map.Entry<String, Object> entry : spaces.entrySet()) {
- AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
- }
- }
- }
-
- public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
- boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
- URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
-
- if (defaultSpace) {
- registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
- }
- registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
- }
-
- private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
- boolean visible, boolean override) {
- Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
- tableSpace.setVisible(visible);
-
- try {
- tableSpace.init(systemConf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- putTablespace(tableSpace, override);
-
- // If the arbitrary path is allowed, root uri is also added as a tablespace
- if (tableSpace.getProperty().isArbitraryPathAllowed()) {
- URI rootUri = tableSpace.getRootUri();
- // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
- if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
- String tmpName = UUID.randomUUID().toString();
- registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
- }
- }
- }
-
- private static void putTablespace(Tablespace space, boolean override) {
- // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
-
- boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
- boolean uriExist = TABLE_SPACES.containsKey(space.uri);
-
- boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
- mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
-
- if (!override && mismatch) {
- throw new RuntimeException("Name or URI of Tablespace must be unique.");
- }
-
- SPACES_URIS_MAP.put(space.getName(), space.getUri());
- // We must guarantee that the same uri results in the same tablespace instance.
- TABLE_SPACES.put(space.getUri(), space);
- }
-
- /**
- * Return length of the fragment.
- * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
- *
- * @param conf Tajo system property
- * @param fragment Fragment
- * @return
- */
- public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
- if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
- return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
- } else {
- return fragment.getLength();
- }
- }
-
- public static final String KEY_STORAGES = "storages"; // storages
- public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
- public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
-
- public static final String KEY_SPACES = "spaces";
-
- private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
- Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
- Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
-
- if (clazz == null) {
- throw new RuntimeException("There is no tablespace for " + uri.toString());
- }
-
- try {
- Constructor<? extends Tablespace> constructor =
- (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
-
- if (constructor == null) {
- constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
- constructor.setAccessible(true);
- CONSTRUCTORS.put(clazz, constructor);
- }
-
- return constructor.newInstance(new Object[]{spaceName, uri});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @VisibleForTesting
- public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
- Tablespace existing;
- synchronized (SPACES_URIS_MAP) {
- // Remove existing one
- SPACES_URIS_MAP.remove(space.getName());
- existing = TABLE_SPACES.remove(space.getUri());
-
- // Add anotherone for test
- registerTableSpace(space.name, space.uri, null, true, true);
- }
- // if there is an existing one, return it.
- return Optional.fromNullable(existing);
- }
-
- public Iterable<String> getSupportSchemes() {
- return TABLE_SPACE_HANDLERS.keySet();
- }
-
- /**
- * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
- *
- * @param uri Table or Table Fragment URI.
- * @param <T> Tablespace class type
- * @return Tablespace. If uri is null, the default tablespace will be returned.
- */
- public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
-
- if (uri == null || uri.isEmpty()) {
- return (Optional<T>) Optional.of(getDefault());
- }
-
- Tablespace lastOne = null;
-
- // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
- // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
- for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
- if (uri.startsWith(entry.getKey().toString())) {
- lastOne = entry.getValue();
- }
- }
- return (Optional<T>) Optional.fromNullable(lastOne);
- }
-
- /**
- * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
- *
- * @param uri Table or Table Fragment URI.
- * @param <T> Tablespace class type
- * @return Tablespace. If uri is null, the default tablespace will be returned.
- */
- public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
- if (uri == null) {
- return (Optional<T>) Optional.of(getDefault());
- } else {
- return (Optional<T>) get(uri.toString());
- }
- }
-
- /**
- * It returns the default tablespace. This method ensures that it always return the tablespace.
- *
- * @return
- */
- public static <T extends Tablespace> T getDefault() {
- return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
- }
-
- public static <T extends Tablespace> T getLocalFs() {
- return (T) get(LOCAL_FS_URI).get();
- }
-
- public static Optional<? extends Tablespace> getByName(String name) {
- URI uri = SPACES_URIS_MAP.get(name);
- if (uri != null) {
- return Optional.of(TABLE_SPACES.get(uri));
- } else {
- return Optional.absent();
- }
- }
-
- public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
- for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
- String uriScheme = entry.getKey().getScheme();
- if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
- return Optional.of(entry.getValue());
- }
- }
-
- return Optional.absent();
- }
-
- @Override
- public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
- Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
- return space.getTableUri(databaseName, tableName);
- }
-
- public static Iterable<Tablespace> getAllTablespaces() {
- return TABLE_SPACES.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 77c5d05..52e223d 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -18,9 +18,11 @@
package org.apache.tajo.storage;
+import com.google.common.base.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
+import org.apache.tajo.QueryVars;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
@@ -149,7 +151,7 @@ public abstract class Tablespace {
*/
public abstract StorageProperty getProperty();
- public abstract FormatProperty getFormatProperty(String dataFormat);
+ public abstract FormatProperty getFormatProperty(TableMeta meta);
/**
* Release storage manager resource
@@ -259,6 +261,14 @@ public abstract class Tablespace {
return scanner;
}
+ public Appender getAppenderForInsertRow(OverridableConf queryContext,
+ TaskAttemptId taskAttemptId,
+ TableMeta meta,
+ Schema schema,
+ Path workDir) throws IOException {
+ return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
+ }
+
/**
* Returns Appender instance.
* @param queryContext Query property.
@@ -395,4 +405,11 @@ public abstract class Tablespace {
return false;
}
}
+
+ public abstract URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException;
+
+ public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
+ TableMeta meta) throws IOException {
+ throw new IOException("Staging the output result is not supported in this storage");
+ }
}