You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 00:49:20 UTC
[7/7] tajo git commit: TAJO-1616: Implement TablespaceManager to load
Tablespaces.
TAJO-1616: Implement TablespaceManager to load Tablespaces.
Closes #602
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6cc7e5c6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6cc7e5c6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6cc7e5c6
Branch: refs/heads/TAJO-1616
Commit: 6cc7e5c60a1aaf3a12c6fa35213b0b5295a9d1fd
Parents: 0f7ff8f
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jun 23 15:45:53 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jun 23 15:45:53 2015 -0700
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/algebra/CreateTable.java | 14 +
.../org/apache/tajo/catalog/CatalogUtil.java | 7 +
.../org/apache/tajo/catalog/DDLBuilder.java | 5 +-
.../java/org/apache/tajo/catalog/TableDesc.java | 8 +-
.../org/apache/tajo/catalog/TestTableDesc.java | 2 +-
.../tajo/catalog/store/HiveCatalogStore.java | 2 +-
.../catalog/store/TestHiveCatalogStore.java | 25 +-
.../tajo/catalog/store/AbstractDBStore.java | 44 +-
.../src/main/resources/schemas/derby/derby.xml | 2 +-
.../main/resources/schemas/mariadb/mariadb.xml | 3 +-
.../src/main/resources/schemas/mysql/mysql.xml | 3 +-
.../main/resources/schemas/oracle/oracle.xml | 3 +-
.../resources/schemas/postgresql/postgresql.xml | 4 +-
.../cli/tsql/commands/DescTableCommand.java | 2 +-
.../main/java/org/apache/tajo/QueryVars.java | 4 +-
.../apache/tajo/storage/StorageConstants.java | 6 +
.../org/apache/tajo/storage/StorageService.java | 37 ++
.../java/org/apache/tajo/util/FileUtil.java | 14 +-
.../org/apache/tajo/util/ReflectionUtil.java | 6 +-
.../org/apache/tajo/engine/parser/SQLParser.g4 | 8 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 11 +-
.../engine/planner/PhysicalPlannerImpl.java | 14 +-
.../engine/planner/global/GlobalPlanner.java | 4 +-
.../planner/physical/BSTIndexScanExec.java | 2 +-
.../planner/physical/ColPartitionStoreExec.java | 2 +-
.../planner/physical/PhysicalPlanUtil.java | 2 +-
.../physical/RangeShuffleFileWriteExec.java | 2 +-
.../engine/planner/physical/SeqScanExec.java | 6 +-
.../engine/planner/physical/StoreTableExec.java | 19 +-
.../apache/tajo/engine/query/QueryContext.java | 67 ++-
.../org/apache/tajo/master/GlobalEngine.java | 13 +-
.../java/org/apache/tajo/master/TajoMaster.java | 22 +-
.../tajo/master/TajoMasterClientService.java | 3 +-
.../apache/tajo/master/exec/DDLExecutor.java | 58 ++-
.../exec/NonForwardQueryResultFileScanner.java | 2 +-
.../apache/tajo/master/exec/QueryExecutor.java | 238 +++++----
.../master/exec/prehook/CreateTableHook.java | 10 +-
.../prehook/DistributedQueryHookManager.java | 2 +-
.../master/exec/prehook/InsertIntoHook.java | 8 +-
.../java/org/apache/tajo/querymaster/Query.java | 51 +-
.../tajo/querymaster/QueryMasterTask.java | 72 ++-
.../apache/tajo/querymaster/Repartitioner.java | 77 +--
.../java/org/apache/tajo/querymaster/Stage.java | 12 +-
.../tajo/webapp/QueryExecutorServlet.java | 2 +-
.../org/apache/tajo/worker/LegacyTaskImpl.java | 2 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 5 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 2 +-
.../resources/webapps/admin/catalogview.jsp | 3 +-
.../src/main/resources/webapps/admin/index.jsp | 23 +-
.../org/apache/tajo/BackendTestingUtil.java | 2 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 4 +-
.../org/apache/tajo/TajoTestingCluster.java | 35 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +-
.../org/apache/tajo/client/TestTajoClient.java | 18 +-
.../apache/tajo/engine/eval/ExprTestBase.java | 3 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 3 +-
.../engine/planner/TestLogicalOptimizer.java | 4 +-
.../tajo/engine/planner/TestLogicalPlan.java | 3 +-
.../tajo/engine/planner/TestLogicalPlanner.java | 92 ++--
.../tajo/engine/planner/TestPlannerUtil.java | 5 +-
.../planner/physical/TestBNLJoinExec.java | 15 +-
.../planner/physical/TestBSTIndexExec.java | 6 +-
.../planner/physical/TestExternalSortExec.java | 6 +-
.../physical/TestFullOuterHashJoinExec.java | 29 +-
.../physical/TestFullOuterMergeJoinExec.java | 40 +-
.../planner/physical/TestHashAntiJoinExec.java | 10 +-
.../planner/physical/TestHashJoinExec.java | 14 +-
.../planner/physical/TestHashSemiJoinExec.java | 14 +-
.../physical/TestLeftOuterHashJoinExec.java | 31 +-
.../planner/physical/TestMergeJoinExec.java | 10 +-
.../engine/planner/physical/TestNLJoinExec.java | 14 +-
.../planner/physical/TestPhysicalPlanner.java | 56 +--
.../physical/TestProgressExternalSortExec.java | 6 +-
.../physical/TestRightOuterHashJoinExec.java | 20 +-
.../physical/TestRightOuterMergeJoinExec.java | 36 +-
.../engine/planner/physical/TestSortExec.java | 6 +-
.../apache/tajo/engine/query/TestCTASQuery.java | 8 +-
.../tajo/engine/query/TestCreateTable.java | 8 +-
.../tajo/engine/query/TestHBaseTable.java | 226 +++++----
.../tajo/engine/query/TestInsertQuery.java | 6 +-
.../apache/tajo/engine/query/TestJoinQuery.java | 6 +-
.../tajo/engine/query/TestTablePartitions.java | 28 +-
.../org/apache/tajo/jdbc/TestResultSet.java | 4 +-
.../tajo/master/TestExecutionBlockCursor.java | 10 +-
.../apache/tajo/querymaster/TestKillQuery.java | 5 +-
.../org/apache/tajo/storage/TestRowFile.java | 5 +-
.../results/TestHBaseTable/testCATS.result | 100 ----
.../results/TestHBaseTable/testCTAS.result | 100 ++++
.../testInsertIntoUsingPut.result | 4 +-
.../results/TestTajoCli/testDescTable.result | 8 +-
.../testDescTableForNestedSchema.result | 4 +-
.../src/main/conf/storage-site.json.template | 35 ++
tajo-docs/src/main/sphinx/index.rst | 1 +
tajo-docs/src/main/sphinx/storage_plugin.rst | 47 ++
tajo-docs/src/main/sphinx/table_management.rst | 1 +
.../sphinx/table_management/table_overview.rst | 7 +
.../sphinx/table_management/tablespaces.rst | 45 ++
.../org/apache/tajo/plan/LogicalPlanner.java | 64 ++-
.../tajo/plan/logical/CreateTableNode.java | 44 +-
.../apache/tajo/plan/logical/InsertNode.java | 51 +-
.../plan/logical/PartitionedTableScanNode.java | 2 +-
.../org/apache/tajo/plan/logical/ScanNode.java | 2 +-
.../tajo/plan/logical/StoreTableNode.java | 27 ++
.../rewrite/rules/PartitionedTableRewriter.java | 4 +-
.../plan/serder/LogicalNodeDeserializer.java | 17 +-
.../tajo/plan/serder/LogicalNodeSerializer.java | 25 +-
.../org/apache/tajo/plan/util/PlannerUtil.java | 14 +-
tajo-plan/src/main/proto/Plan.proto | 11 +-
tajo-storage/tajo-storage-common/pom.xml | 12 +-
.../org/apache/tajo/storage/FormatProperty.java | 31 ++
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../apache/tajo/storage/OldStorageManager.java | 251 ++++++++++
.../apache/tajo/storage/StorageProperty.java | 49 +-
.../org/apache/tajo/storage/StorageUtil.java | 4 +-
.../apache/tajo/storage/TableSpaceManager.java | 480 ++++++++++++-------
.../org/apache/tajo/storage/Tablespace.java | 194 +++++---
.../src/main/resources/storage-default.json | 20 +
tajo-storage/tajo-storage-hbase/pom.xml | 11 +
.../storage/hbase/AbstractHBaseAppender.java | 2 +-
.../storage/hbase/AddSortForInsertRewriter.java | 91 ----
.../tajo/storage/hbase/ColumnMapping.java | 17 +-
.../tajo/storage/hbase/HBaseFragment.java | 28 +-
.../tajo/storage/hbase/HBasePutAppender.java | 11 +-
.../apache/tajo/storage/hbase/HBaseScanner.java | 9 +-
.../tajo/storage/hbase/HBaseTablespace.java | 209 ++++----
.../storage/hbase/SortedInsertRewriter.java | 116 +++++
.../src/main/proto/StorageFragmentProtos.proto | 15 +-
.../tajo/storage/hbase/TestColumnMapping.java | 2 +-
.../storage/hbase/TestHBaseStorageManager.java | 108 -----
.../tajo/storage/hbase/TestHBaseTableSpace.java | 134 ++++++
tajo-storage/tajo-storage-hdfs/pom.xml | 6 +-
.../org/apache/tajo/storage/FileAppender.java | 28 +-
.../org/apache/tajo/storage/FileTablespace.java | 137 ++++--
.../storage/HashShuffleAppenderManager.java | 5 +-
.../tajo/storage/TestCompressionStorages.java | 4 +-
.../tajo/storage/TestDelimitedTextFile.java | 8 +-
.../tajo/storage/TestFileStorageManager.java | 233 ---------
.../apache/tajo/storage/TestFileSystems.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 250 ++++++++++
.../org/apache/tajo/storage/TestLineReader.java | 8 +-
.../apache/tajo/storage/TestMergeScanner.java | 6 +-
.../org/apache/tajo/storage/TestStorages.java | 48 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 58 ++-
.../index/TestSingleCSVFileBSTIndex.java | 4 +-
.../apache/tajo/storage/json/TestJsonSerDe.java | 2 +-
146 files changed, 2966 insertions(+), 1972 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d64022a..b541cdc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -351,6 +351,8 @@ Release 0.11.0 - unreleased
SUB TASKS
+ TAJO-1616: Implement TablespaceManager to load Tablespaces. (hyunsik)
+
TAJO-1615: Implement TaskManager. (jinho)
TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index 2d4a241..5d1599d 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -38,6 +38,8 @@ public class CreateTable extends Expr {
private String tableName;
@Expose @SerializedName("Attributes")
private ColumnDefinition [] tableElements;
+ @Expose @SerializedName("SpaceName")
+ private String spaceName;
@Expose @SerializedName("StorageType")
private String storageType;
@Expose @SerializedName("Location")
@@ -100,6 +102,18 @@ public class CreateTable extends Expr {
this.tableElements = tableElements;
}
+ public boolean hasTableSpaceName() {
+ return spaceName != null;
+ }
+
+ public void setTableSpaceName(String spaceName) {
+ this.spaceName = spaceName;
+ }
+
+ public String getTableSpaceName() {
+ return spaceName;
+ }
+
public boolean hasStorageType() {
return storageType != null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index a2e4a9d..6c6915b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -18,6 +18,7 @@
package org.apache.tajo.catalog;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.DataTypeUtil;
@@ -31,6 +32,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.TUtil;
@@ -227,6 +229,11 @@ public class CatalogUtil {
return sb.toString();
}
+ public static Pair<String, String> separateQualifierAndName(String name) {
+ Preconditions.checkArgument(isFQTableName(name), "Must be a qualified name.");
+ return new Pair<String, String>(extractQualifier(name), extractSimpleName(name));
+ }
+
/**
* Extract a qualification name from an identifier.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index 65640b9..62dd894 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -19,7 +19,6 @@
package org.apache.tajo.catalog;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.util.KeyValueSet;
import java.util.Map;
@@ -32,7 +31,7 @@ public class DDLBuilder {
sb.append("--\n")
.append("-- Name: ").append(CatalogUtil.denormalizeIdentifier(desc.getName())).append("; Type: TABLE;")
.append(" Storage: ").append(desc.getMeta().getStoreType());
- sb.append("\n-- Path: ").append(desc.getPath());
+ sb.append("\n-- Path: ").append(desc.getUri());
sb.append("\n--\n");
sb.append("CREATE EXTERNAL TABLE ").append(CatalogUtil.denormalizeIdentifier(desc.getName()));
buildSchema(sb, desc.getSchema());
@@ -109,7 +108,7 @@ public class DDLBuilder {
}
private static void buildLocationClause(StringBuilder sb, TableDesc desc) {
- sb.append(" LOCATION '").append(desc.getPath()).append("'");
+ sb.append(" LOCATION '").append(desc.getUri()).append("'");
}
private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index 17f9146..4700322 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -89,12 +89,16 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
public String getName() {
return this.tableName;
}
+
+ public boolean hasUri() {
+ return this.uri != null;
+ }
- public void setPath(URI uri) {
+ public void setUri(URI uri) {
this.uri = uri;
}
- public URI getPath() {
+ public URI getUri() {
return this.uri;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
index 9d84de6..41a0832 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
@@ -78,7 +78,7 @@ public class TestTableDesc {
TableDesc desc = new TableDesc("table1", schema, info, path.toUri());
assertEquals("table1", desc.getName());
- assertEquals(path.toUri(), desc.getPath());
+ assertEquals(path.toUri(), desc.getUri());
assertEquals(info, desc.getMeta());
testClone(desc);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 3ea263a..8f23db4 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -443,7 +443,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
table.setTableType(TableType.EXTERNAL_TABLE.name());
table.putToParameters("EXTERNAL", "TRUE");
- Path tablePath = new Path(tableDesc.getPath());
+ Path tablePath = new Path(tableDesc.getUri());
FileSystem fs = tablePath.getFileSystem(conf);
if (fs.isFile(tablePath)) {
LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index 6e91b89..946d271 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -43,7 +43,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.*;
/**
@@ -105,7 +104,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -134,7 +133,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -163,7 +162,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -197,7 +196,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -247,7 +246,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, NATION));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -261,8 +260,8 @@ public class TestHiveCatalogStore {
assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName());
}
- testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101");
- testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102");
+ testAddPartition(table1.getUri(), NATION, "n_nationkey=10/n_date=20150101");
+ testAddPartition(table1.getUri(), NATION, "n_nationkey=20/n_date=20150102");
testDropPartition(NATION, "n_nationkey=10/n_date=20150101");
testDropPartition(NATION, "n_nationkey=20/n_date=20150102");
@@ -370,7 +369,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName));
FileSystem fs = FileSystem.getLocal(new Configuration());
- assertTrue(fs.exists(new Path(table1.getPath())));
+ assertTrue(fs.exists(new Path(table1.getUri())));
store.dropTable(DB_NAME, tableName);
assertFalse(store.existTable(DB_NAME, tableName));
@@ -395,7 +394,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -424,7 +423,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -456,7 +455,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -491,7 +490,7 @@ public class TestHiveCatalogStore {
TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName));
assertEquals(table.getName(), table1.getName());
- assertEquals(table.getPath(), table1.getPath());
+ assertEquals(table.getUri(), table1.getUri());
assertEquals(table.getSchema().size(), table1.getSchema().size());
for (int i = 0; i < table.getSchema().size(); i++) {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 7fe6ef3..47c2219 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -752,36 +752,25 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
int dbid = getDatabaseId(databaseName);
- if (table.getIsExternal()) {
- String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) ";
+ String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME +
+ ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) ";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
- pstmt = conn.prepareStatement(sql);
- pstmt.setInt(1, dbid);
- pstmt.setString(2, tableName);
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setInt(1, dbid);
+ pstmt.setString(2, tableName);
+ if (table.getIsExternal()) {
pstmt.setString(3, TableType.EXTERNAL_TABLE.name());
- pstmt.setString(4, table.getPath());
- pstmt.setString(5, table.getMeta().getStoreType());
- pstmt.executeUpdate();
- pstmt.close();
} else {
- String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, STORE_TYPE) VALUES(?, ?, ?, ?) ";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- pstmt = conn.prepareStatement(sql);
- pstmt.setInt(1, dbid);
- pstmt.setString(2, tableName);
pstmt.setString(3, TableType.BASE_TABLE.name());
- pstmt.setString(4, table.getMeta().getStoreType());
- pstmt.executeUpdate();
- pstmt.close();
}
+ pstmt.setString(4, table.getPath());
+ pstmt.setString(5, table.getMeta().getStoreType());
+ pstmt.executeUpdate();
+ pstmt.close();
String tidSql =
"SELECT TID from " + TB_TABLES + " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_NAME + "=?";
@@ -1603,12 +1592,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
tableBuilder.setIsExternal(true);
}
- if (tableType == TableType.BASE_TABLE) {
- tableBuilder.setPath(databaseIdAndUri.getSecond() + "/" + tableName);
- } else {
- tableBuilder.setPath(res.getString(4).trim());
- }
-
+ tableBuilder.setPath(res.getString(4).trim());
storeType = res.getString(5).trim();
res.close();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
index a4ff00f..e0bd469 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
@@ -24,7 +24,7 @@
* 2 - 2014-06-09: First versioning
* 1- Before 2013-03-20
-->
- <tns:base version="4">
+ <tns:base version="5">
<tns:objects>
<tns:Object order="0" type="table" name="META">
<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
index 79ccd0a..7485da1 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
@@ -19,12 +19,13 @@
<tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
<!--
Catalog base version history
+ * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
* 4 - 2015-03-27: Partition Schema (TAJO-1284)
* 3 - 2015-03-12: Nested Schema (TAJO-1329)
* 2 - 2014-06-09: First versioning
* 1- Before 2013-03-20
-->
- <tns:base version="4">
+ <tns:base version="5">
<tns:objects>
<tns:Object order="0" type="table" name="META">
<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
index 34337fb..2bde04f 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
@@ -19,12 +19,13 @@
<tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
<!--
Catalog base version history
+ * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
* 4 - 2015-03-27: Partition Schema (TAJO-1284)
* 3 - 2015-03-12: Nested Schema (TAJO-1329)
* 2 - 2014-06-09: First versioning
* 1- Before 2013-03-20
-->
- <tns:base version="4">
+ <tns:base version="5">
<tns:objects>
<tns:Object order="0" type="table" name="META">
<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
index 323e22c..2778e0c 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
@@ -19,12 +19,13 @@
<tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
<!--
Catalog base version history
+ * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
* 4 - 2015-03-27: Partition Schema (TAJO-1284)
* 3 - 2015-03-12: Nested Schema (TAJO-1329)
* 2 - 2014-06-09: First versioning
* 1- Before 2013-03-20
-->
- <tns:base version="4">
+ <tns:base version="5">
<tns:objects>
<tns:Object order="0" type="table" name="meta">
<tns:sql><![CDATA[
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
index 554acd5..0051242 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
@@ -21,12 +21,14 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
<!--
Catalog base version history
+ * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
+ * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
* 4 - 2015-03-27: Partition Schema (TAJO-1284)
* 3 - 2015-03-12: Nested Schema (TAJO-1329)
* 2 - 2014-06-09: First versioning
* 1- Before 2013-03-20
-->
- <tns:base version="4">
+ <tns:base version="5">
<tns:objects>
<tns:Object name="META" type="table" order="0">
<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
index 6df26b7..6b2905a 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
@@ -78,7 +78,7 @@ public class DescTableCommand extends TajoShellCommand {
protected String toFormattedString(TableDesc desc) {
StringBuilder sb = new StringBuilder();
sb.append("\ntable name: ").append(desc.getName()).append("\n");
- sb.append("table path: ").append(desc.getPath()).append("\n");
+ sb.append("table uri: ").append(desc.getUri()).append("\n");
sb.append("store type: ").append(desc.getMeta().getStoreType()).append("\n");
if (desc.getStats() != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
index 55ca700..a6d5d1d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
@@ -22,9 +22,11 @@ import org.apache.tajo.validation.Validator;
public enum QueryVars implements ConfigKey {
COMMAND_TYPE,
+ DEFAULT_SPACE_URI,
+ DEFAULT_SPACE_ROOT_URI,
STAGING_DIR,
OUTPUT_TABLE_NAME,
- OUTPUT_TABLE_PATH,
+ OUTPUT_TABLE_URI,
OUTPUT_PARTITIONS,
OUTPUT_OVERWRITE,
OUTPUT_AS_DIRECTORY,
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index d2c6c1c..a9923a5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -20,8 +20,14 @@ package org.apache.tajo.storage;
import org.apache.tajo.TajoConstants;
+import java.net.URI;
+
public class StorageConstants {
+ // Tablespace -------------------------------------------------
+
+ public static final URI LOCAL_FS_URI = URI.create("file:/");
+
// Common table properties -------------------------------------------------
// time zone
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
new file mode 100644
index 0000000..1057097
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * TablespaceManager interface for loosely coupled usages
+ */
+public interface StorageService {
+ /**
+ * Get Table URI
+ *
+ * @param spaceName Tablespace name. If it is null, the default space will be used
+ * @param databaseName Database name
+ * @param tableName Table name
+ * @return Table URI
+ */
+ URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 3e3d3a2..39f4c29 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -87,8 +87,20 @@ public class FileUtil {
return ClassLoader.getSystemResource(resource);
}
+ /**
+ * It returns a string from a text file found in classpath.
+ *
+ * @param resource Resource file name
+ * @return String contents if exists. Otherwise, it will return null.
+ * @throws IOException
+ */
public static String readTextFileFromResource(String resource) throws IOException {
- return readTextFromStream(ClassLoader.getSystemResourceAsStream(resource));
+ InputStream stream = ClassLoader.getSystemResourceAsStream(resource);
+ if (stream != null) {
+ return readTextFromStream(stream);
+ } else {
+ return null;
+ }
}
public static String readTextFromStream(InputStream inputStream)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index e2def69..5a712c0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -25,9 +25,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ReflectionUtil {
- private static final Class<?>[] EMPTY_PARAM = new Class[]{};
- private static final Object [] EMPTY_OBJECT = new Object[] {};
- private static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class};
+ public static final Class<?>[] EMPTY_PARAM = new Class[]{};
+ public static final Object [] EMPTY_OBJECT = new Object[] {};
+ public static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class};
/**
* Caches of constructors for each class. Pins the classes so they
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 3ab11bd..469b2a2 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -98,11 +98,11 @@ if_exists
;
create_table_statement
- : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier
- (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)?
- | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)?
+ : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? USING storage_type=identifier
+ (param_clause)? (table_partitioning_clauses)? (LOCATION uri=Character_String_Literal)?
+ | CREATE TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? (USING storage_type=identifier)?
(param_clause)? (table_partitioning_clauses)? (AS query_expression)?
- | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)?
+ | CREATE TABLE (if_not_exists)? table_name (TABLESPACE spacename=identifier)? (USING storage_type=identifier)?
(param_clause)? (table_partitioning_clauses)? AS query_expression
| CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name
;
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 7c99868..62bb0f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -1252,9 +1252,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
createTable.setTableElements(elements);
createTable.setStorageType(storageType);
- if (PlannerUtil.isFileStorageType(storageType)) {
- String path = stripQuote(ctx.path.getText());
- createTable.setLocation(path);
+ if (checkIfExist(ctx.LOCATION())) {
+ String uri = stripQuote(ctx.uri.getText());
+ createTable.setLocation(uri);
}
} else {
if (checkIfExist(ctx.table_elements())) {
@@ -1262,6 +1262,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
createTable.setTableElements(elements);
}
+ if (checkIfExist(ctx.TABLESPACE())) {
+ String spaceName = ctx.spacename.getText();
+ createTable.setTableSpaceName(spaceName);
+ }
+
if (checkIfExist(ctx.USING())) {
String fileType = ctx.storage_type.getText();
createTable.setStorageType(fileType);
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 30cb24f..f0b2f5e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -251,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
FragmentProto[] fragmentProtos = ctx.getTables(tableId);
List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
for (Fragment frag : fragments) {
- size += Tablespace.getFragmentLength(ctx.getConf(), frag);
+ size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag);
}
}
return size;
@@ -924,9 +925,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
if (broadcastFlag) {
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = TUtil.newList();
- FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf());
+
+ FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get();
for (Path path : partitionedTableScanNode.getInputPaths()) {
- fileFragments.addAll(TUtil.newList(fileStorageManager.split(scanNode.getCanonicalName(), path)));
+ fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path)));
}
FragmentProto[] fragments =
@@ -1188,8 +1190,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
- FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf());
- Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
+ FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get();
+ String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
+ String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
+ Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");
TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
annotation.getSortKeys());
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 89e887a..ba4833b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -48,6 +48,7 @@ import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ReflectionUtil;
import org.apache.tajo.util.TUtil;
@@ -170,7 +171,8 @@ public class GlobalPlanner {
"Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() +
") is not initialized");
TableMeta meta = new TableMeta(channel.getStoreType(), new KeyValueSet());
- TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/").toUri());
+ TableDesc desc = new TableDesc(
+ channel.getSrcId().toString(), channel.getSchema(), meta, StorageConstants.LOCAL_FS_URI);
ScanNode scanNode = plan.createNode(ScanNode.class);
scanNode.init(desc);
return scanNode;
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index bc6975a..54abca8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -56,7 +56,7 @@ public class BSTIndexScanExec extends PhysicalExec {
this.qual = scanNode.getQual();
this.datum = datum;
- this.fileScanner = TableSpaceManager.getSeekableScanner(context.getConf(),
+ this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(),
scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 3121671..969998c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
actualFilePath = new Path(lastFileName + "_" + suffixId);
}
- appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
+ appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get())
.getAppender(meta, outSchema, actualFilePath);
appender.enableStats();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index d240edb..deda498 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -74,7 +74,7 @@ public class PhysicalPlanUtil {
*/
public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
int fileIndex, int numResultFiles) throws IOException {
- Path path = new Path(tableDesc.getPath());
+ Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(tajoConf);
//In the case of partitioned table, we should return same partition key data files.
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 3dd1cd9..fb29e4f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
context.getDataChannel().getStoreType() : "RAW");
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
+ this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault())
.getAppender(meta, outSchema, new Path(storeTablePath, "output"));
this.appender.enableStats();
this.appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index b01af6c..d2ae3bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -202,10 +202,8 @@ public class SeqScanExec extends ScanExec {
FragmentConvertor.convert(context.getConf(), fragments), projected
);
} else {
- Tablespace tablespace = TableSpaceManager.getStorageManager(
- context.getConf(), plan.getTableDesc().getMeta().getStoreType());
- this.scanner = tablespace.getScanner(meta,
- plan.getPhysicalSchema(), fragments[0], projected);
+ Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get();
+ this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 5b17eee..dd8768e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner.physical;
+import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -35,6 +36,7 @@ import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.net.URI;
/**
* This is a physical executor to store a table part into a specified storage.
@@ -90,17 +92,26 @@ public class StoreTableExec extends UnaryPhysicalExec {
lastFileName = new Path(lastFileName + "_" + suffixId);
}
- appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
- .getAppender(meta, appenderSchema, lastFileName);
+ Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri());
+ if (!spaceRes.isPresent()) {
+ throw new IllegalStateException("No Tablespace for " + lastFileName.toUri());
+ }
+
+ FileTablespace space = spaceRes.get();
+ appender = space.getAppender(meta, appenderSchema, lastFileName);
if (suffixId > 0) {
LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " +
"The remain output will be written into " + lastFileName.toString());
}
} else {
- appender = TableSpaceManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(
+ Path stagingDir = context.getQueryContext().getStagingDir();
+ appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender(
context.getQueryContext(),
- context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir());
+ context.getTaskId(),
+ meta,
+ appenderSchema,
+ stagingDir);
}
appender.enableStats();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index ee50221..7696c6c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -28,6 +28,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.session.Session;
+import java.net.URI;
+
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
/**
@@ -63,6 +65,40 @@ public class QueryContext extends OverridableConf {
return get(SessionVars.USERNAME);
}
+ /**
+ * Set the default tablespace uri
+ *
+ * @param uri The default tablespace uri
+ */
+ public void setDefaultSpaceUri(URI uri) {
+ put(QueryVars.DEFAULT_SPACE_URI, uri.toString());
+ }
+
+ /**
+ * Return the default tablespace uri
+ */
+ public URI getDefaultSpaceUri() {
+ String strVal = get(QueryVars.DEFAULT_SPACE_URI, "");
+ return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
+ }
+
+ /**
+ * Set the root uri of the default tablespace
+ *
+ * @param uri The root uri of the default tablespace
+ */
+ public void setDefaultSpaceRootUri(URI uri) {
+ put(QueryVars.DEFAULT_SPACE_ROOT_URI, uri.toString());
+ }
+
+ /**
+ * Return the root of the default tablespace
+ */
+ public URI getDefaultSpaceRootUri() {
+ String strVal = get(QueryVars.DEFAULT_SPACE_ROOT_URI, "");
+ return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
+ }
+
public void setStagingDir(Path path) {
put(QueryVars.STAGING_DIR, path.toUri().toString());
}
@@ -82,24 +118,33 @@ public class QueryContext extends OverridableConf {
}
/**
- * The fact that QueryContext has an output path means this query will write the output to a specific directory.
- * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement.
+ * The final output table's uri. It will be set if a query is CTAS or INSERT (OVERWRITE) INTO statement
*
- * @return
+ * @return True if a output table uri is set. Otherwise, it will return false
*/
- public boolean hasOutputPath() {
- return containsKey(QueryVars.OUTPUT_TABLE_PATH);
+ public boolean hasOutputTableUri() {
+ return containsKey(QueryVars.OUTPUT_TABLE_URI);
}
- public void setOutputPath(Path path) {
- if (path != null) {
- put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString());
+ /**
+ * Set the final output table uri
+ *
+ * @param uri
+ */
+ public void setOutputPath(URI uri) {
+ if (uri != null) {
+ put(QueryVars.OUTPUT_TABLE_URI, uri.toString());
}
}
- public Path getOutputPath() {
- String strVal = get(QueryVars.OUTPUT_TABLE_PATH);
- return strVal != null ? new Path(strVal) : null;
+ /**
+ * Get the final output table uri
+ *
+ * @return The final output table uri
+ */
+ public URI getOutputTableUri() {
+ String strVal = get(QueryVars.OUTPUT_TABLE_URI);
+ return strVal != null ? URI.create(strVal) : null;
}
public boolean hasPartition() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2cd585f..e833884 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -53,7 +53,6 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.CommonTestingUtil;
@@ -68,7 +67,6 @@ public class GlobalEngine extends AbstractService {
private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
private final MasterContext context;
- private final Tablespace sm;
private SQLAnalyzer analyzer;
private CatalogService catalog;
@@ -84,7 +82,6 @@ public class GlobalEngine extends AbstractService {
super(GlobalEngine.class.getName());
this.context = context;
this.catalog = context.getCatalog();
- this.sm = context.getStorageManager();
this.ddlExecutor = new DDLExecutor(context);
this.queryExecutor = new QueryExecutor(context, ddlExecutor);
@@ -94,7 +91,7 @@ public class GlobalEngine extends AbstractService {
try {
analyzer = new SQLAnalyzer();
preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
- planner = new LogicalPlanner(context.getCatalog());
+ planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance());
optimizer = new LogicalOptimizer(context.getConf());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
} catch (Throwable t) {
@@ -143,6 +140,10 @@ public class GlobalEngine extends AbstractService {
private QueryContext createQueryContext(Session session) {
QueryContext newQueryContext = new QueryContext(context.getConf(), session);
+ // Set default space uri and its root uri
+ newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri());
+ newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri());
+
String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
@@ -302,8 +303,8 @@ public class GlobalEngine extends AbstractService {
InsertNode iNode = rootNode.getChild();
Schema outSchema = iNode.getChild().getOutSchema();
- TableSpaceManager.getStorageManager(queryContext.getConf(), storeType)
- .verifyInsertTableSchema(tableDesc, outSchema);
+ TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
+
} catch (Throwable t) {
state.addVerification(t.getMessage());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c41fdde..e1e85dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -21,7 +21,10 @@ package org.apache.tajo.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.CompositeService;
@@ -53,8 +56,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.session.SessionManager;
-import org.apache.tajo.storage.Tablespace;
-import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.*;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
@@ -114,7 +115,6 @@ public class TajoMaster extends CompositeService {
private CatalogServer catalogServer;
private CatalogService catalog;
- private Tablespace storeManager;
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
@@ -183,7 +183,6 @@ public class TajoMaster extends CompositeService {
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
diagnoseTajoMaster();
- this.storeManager = TableSpaceManager.getFileStorageManager(systemConf);
catalogServer = new CatalogServer(loadFunctions());
addIfService(catalogServer);
@@ -211,7 +210,14 @@ public class TajoMaster extends CompositeService {
throw e;
}
- super.serviceInit(systemConf);
+ // Try to start up all services in TajoMaster.
+ // If anyone is failed, the master prints out the errors and immediately should shutdowns
+ try {
+ super.serviceInit(systemConf);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ System.exit(1);
+ }
LOG.info("Tajo Master is initialized.");
}
@@ -477,10 +483,6 @@ public class TajoMaster extends CompositeService {
return globalEngine;
}
- public Tablespace getStorageManager() {
- return storeManager;
- }
-
public QueryCoordinatorService getTajoMasterService() {
return tajoMasterService;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 31eecdc..7dbe815 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -844,8 +844,7 @@ public class TajoMasterClientService extends AbstractService {
TableDesc desc;
try {
desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
- meta.getStoreType(), schema,
- meta, path, true, partitionDesc, false);
+ null, meta.getStoreType(), schema, meta, path.toUri(), true, partitionDesc, false);
} catch (Exception e) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 93c950e..5e0e639 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -36,11 +36,12 @@ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -54,12 +55,10 @@ public class DDLExecutor {
private final TajoMaster.MasterContext context;
private final CatalogService catalog;
- private final Tablespace tablespace;
public DDLExecutor(TajoMaster.MasterContext context) {
this.context = context;
this.catalog = context.getCatalog();
- this.tablespace = context.getStorageManager();
}
public boolean execute(QueryContext queryContext, LogicalPlan plan) throws IOException {
@@ -202,17 +201,31 @@ public class DDLExecutor {
}
if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
- Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+ Preconditions.checkState(createTable.hasUri(), "ERROR: LOCATION must be given.");
}
- return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
- createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
- createTable.getPartitionMethod(), ifNotExists);
+ return createTable(
+ queryContext,
+ createTable.getTableName(),
+ createTable.getTableSpaceName(),
+ createTable.getStorageType(),createTable.getTableSchema(),
+ meta,
+ createTable.getUri(),
+ createTable.isExternal(),
+ createTable.getPartitionMethod(),
+ ifNotExists);
}
- public TableDesc createTable(QueryContext queryContext, String tableName, String storeType,
- Schema schema, TableMeta meta, Path path, boolean isExternal,
- PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
+ public TableDesc createTable(QueryContext queryContext,
+ String tableName,
+ @Nullable String tableSpaceName,
+ @Nullable String storeType,
+ Schema schema,
+ TableMeta meta,
+ @Nullable URI uri,
+ boolean isExternal,
+ @Nullable PartitionMethodDesc partitionDesc,
+ boolean ifNotExists) throws IOException {
String databaseName;
String simpleTableName;
if (CatalogUtil.isFQTableName(tableName)) {
@@ -232,18 +245,28 @@ public class DDLExecutor {
LOG.info("relation \"" + qualifiedName + "\" is already exists." );
return catalog.getTableDesc(databaseName, simpleTableName);
} else {
- throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+ throw new AlreadyExistsTableException(qualifiedName);
}
}
- TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
- schema, meta, (path != null ? path.toUri(): null), isExternal);
+ Tablespace tableSpace;
+ if (tableSpaceName != null) {
+ tableSpace = TableSpaceManager.getByName(tableSpaceName).get();
+ } else if (uri != null) {
+ tableSpace = TableSpaceManager.get(uri).get();
+ } else {
+ tableSpace = TableSpaceManager.getDefault();
+ }
+
+ TableDesc desc;
+ URI tableUri = isExternal ? uri : tableSpace.getTableUri(databaseName, simpleTableName);
+ desc = new TableDesc(qualifiedName, schema, meta, tableUri, isExternal);
if (partitionDesc != null) {
desc.setPartitionMethod(partitionDesc);
}
- TableSpaceManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+ tableSpace.createTable(desc, ifNotExists);
if (catalog.createTable(desc)) {
LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
@@ -290,8 +313,7 @@ public class DDLExecutor {
if (purge) {
try {
- TableSpaceManager.getStorageManager(queryContext.getConf(),
- tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
+ TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
} catch (IOException e) {
throw new InternalError(e.getMessage());
}
@@ -330,7 +352,7 @@ public class DDLExecutor {
Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
- Path tablePath = new Path(tableDesc.getPath());
+ Path tablePath = new Path(tableDesc.getUri());
if (tablePath.getParent() == null ||
!tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
@@ -340,7 +362,7 @@ public class DDLExecutor {
}
for (TableDesc eachTable: tableDescList) {
- Path path = new Path(eachTable.getPath());
+ Path path = new Path(eachTable.getUri());
LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
FileSystem fs = path.getFileSystem(context.getConf());
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 8f6c6f9..ae57453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
}
private void initSeqScanExec() throws IOException {
- Tablespace tablespace = TableSpaceManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType());
+ Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get();
List<Fragment> fragments = null;
setPartition(tablespace);
fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 281edad..480f45c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -43,24 +43,26 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.QueryManager;
+import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
-import org.apache.tajo.plan.expr.EvalContext;
-import org.apache.tajo.plan.expr.GeneralFunctionEval;
-import org.apache.tajo.plan.function.python.PythonScriptEngine;
-import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.querymaster.*;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalContext;
import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.GeneralFunctionEval;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -329,104 +331,108 @@ public class QueryExecutor {
}
private void insertNonFromQuery(QueryContext queryContext,
- InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
- throws Exception {
- String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
- String queryId = nodeUniqName + "_" + System.currentTimeMillis();
-
- FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
- Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-
- TableDesc tableDesc = null;
- Path finalOutputDir = null;
- if (insertNode.getTableName() != null) {
- tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
- finalOutputDir = new Path(tableDesc.getPath());
- } else {
- finalOutputDir = insertNode.getPath();
- }
-
- TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
- taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
-
- EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
- StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+ InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
try {
- exec.init();
- exec.next();
- } finally {
- exec.close();
- }
+ String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
+ insertNode.getTableName();
+ String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+ FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+ Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+ TableDesc tableDesc = null;
+ Path finalOutputDir;
+ if (insertNode.getTableName() != null) {
+ tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+ finalOutputDir = new Path(tableDesc.getUri());
+ } else {
+ finalOutputDir = new Path(insertNode.getUri());
+ }
+
+ TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+ taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
- if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
- // it moves the original table into the temporary location.
- // Then it moves the new result table into the original table location.
- // Upon failed, it recovers the original table if possible.
- boolean movedToOldTable = false;
- boolean committed = false;
- Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
try {
- if (fs.exists(finalOutputDir)) {
- fs.rename(finalOutputDir, oldTableDir);
- movedToOldTable = fs.exists(oldTableDir);
- } else { // if the parent does not exist, make its parent directory.
- fs.mkdirs(finalOutputDir.getParent());
- }
- fs.rename(stagingResultDir, finalOutputDir);
- committed = fs.exists(finalOutputDir);
- } catch (IOException ioe) {
- // recover the old table
- if (movedToOldTable && !committed) {
- fs.rename(oldTableDir, finalOutputDir);
- }
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
}
- } else {
- FileStatus[] files = fs.listStatus(stagingResultDir);
- for (FileStatus eachFile: files) {
- Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
- if (fs.exists(targetFilePath)) {
- targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+
+ if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+ // it moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ try {
+ if (fs.exists(finalOutputDir)) {
+ fs.rename(finalOutputDir, oldTableDir);
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputDir.getParent());
+ }
+ fs.rename(stagingResultDir, finalOutputDir);
+ committed = fs.exists(finalOutputDir);
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+ fs.rename(oldTableDir, finalOutputDir);
+ }
+ }
+ } else {
+ FileStatus[] files = fs.listStatus(stagingResultDir);
+ for (FileStatus eachFile : files) {
+ Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+ if (fs.exists(targetFilePath)) {
+ targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+ }
+ fs.rename(eachFile.getPath(), targetFilePath);
}
- fs.rename(eachFile.getPath(), targetFilePath);
}
- }
- if (insertNode.hasTargetTable()) {
- TableStats stats = tableDesc.getStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
+ if (insertNode.hasTargetTable()) {
+ TableStats stats = tableDesc.getStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
- CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
- builder.setTableName(tableDesc.getName());
- builder.setStats(stats.getProto());
+ CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+ builder.setTableName(tableDesc.getName());
+ builder.setStats(stats.getProto());
- catalog.updateTableStats(builder.build());
+ catalog.updateTableStats(builder.build());
- responseBuilder.setTableDesc(tableDesc.getProto());
- } else {
- TableStats stats = new TableStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
-
- // Empty TableDesc
- List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
- CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
- .setTableName(nodeUniqName)
- .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
- .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
- .setStats(stats.getProto())
- .build();
-
- responseBuilder.setTableDesc(tableDescProto);
- }
+ responseBuilder.setTableDesc(tableDesc.getProto());
+ } else {
+ TableStats stats = new TableStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
+
+ // Empty TableDesc
+ List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+ CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+ .setTableName(nodeUniqName)
+ .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
+ .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+ .setStats(stats.getProto())
+ .build();
+
+ responseBuilder.setTableDesc(tableDescProto);
+ }
- // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
- responseBuilder.setMaxRowNum(-1);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+ responseBuilder.setMaxRowNum(-1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
}
public void executeDistributedQuery(QueryContext queryContext, Session session,
@@ -436,14 +442,17 @@ public class QueryExecutor {
SubmitQueryResponse.Builder responseBuilder) throws Exception {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- String storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- Tablespace sm = TableSpaceManager.getStorageManager(context.getConf(), storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (!storageProperty.isSupportsInsertInto()) {
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
+ if (tableDesc != null) {
+
+ Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ StorageProperty storageProperty = space.getProperty();
+
+ if (!storageProperty.isInsertable()) {
throw new VerifyException("Inserting into non-file storage is not supported.");
}
- sm.beforeInsertOrCATS(rootNode.getChild());
+
+ space.prepareTable(rootNode.getChild());
}
context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
@@ -471,28 +480,15 @@ public class QueryExecutor {
}
}
- public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+ public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
throws Exception {
- String storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- Tablespace sm = TableSpaceManager.getStorageManager(planner.getConf(), storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (storageProperty.isSortedInsert()) {
- String tableName = PlannerUtil.getStoreTableName(plan);
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
- if (tableDesc == null) {
- throw new VerifyException("Can't get table meta data from catalog: " + tableName);
- }
- List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
- context, tableDesc);
- if (storageSpecifiedRewriteRules != null) {
- for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
- eachRule.rewrite(context, plan);
- }
- }
- }
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+
+ if (tableDesc != null) {
+ Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ space.rewritePlan(context, plan);
}
MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);