You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:09:07 UTC
[15/15] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Conflicts:
tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/17dfe86c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/17dfe86c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/17dfe86c
Branch: refs/heads/index_support
Commit: 17dfe86ccacaf83a81d1f9b45d7c6b82409f0f82
Parents: 00a8c65 2ec307d
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jun 25 17:08:23 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jun 25 17:08:23 2015 +0900
----------------------------------------------------------------------
.travis.yml | 4 +-
CHANGES | 7 +-
NOTICE | 2 +-
.../org/apache/tajo/algebra/CreateTable.java | 14 +
.../org/apache/tajo/catalog/CatalogUtil.java | 7 +
.../org/apache/tajo/catalog/DDLBuilder.java | 5 +-
.../java/org/apache/tajo/catalog/TableDesc.java | 8 +-
.../org/apache/tajo/catalog/TestTableDesc.java | 2 +-
.../tajo/catalog/store/HiveCatalogStore.java | 2 +-
.../catalog/store/TestHiveCatalogStore.java | 25 +-
.../tajo/catalog/store/AbstractDBStore.java | 44 +--
.../src/main/resources/schemas/derby/derby.xml | 2 +-
.../main/resources/schemas/mariadb/mariadb.xml | 3 +-
.../src/main/resources/schemas/mysql/mysql.xml | 3 +-
.../main/resources/schemas/oracle/oracle.xml | 3 +-
.../resources/schemas/postgresql/postgresql.xml | 4 +-
.../cli/tsql/commands/DescTableCommand.java | 2 +-
.../main/java/org/apache/tajo/QueryVars.java | 4 +-
.../apache/tajo/storage/StorageConstants.java | 9 +
.../org/apache/tajo/storage/StorageService.java | 37 ++
.../java/org/apache/tajo/util/FileUtil.java | 14 +-
.../java/org/apache/tajo/util/KeyValueSet.java | 8 +-
.../java/org/apache/tajo/util/NumberUtil.java | 54 ---
.../org/apache/tajo/util/ReflectionUtil.java | 6 +-
.../org/apache/tajo/engine/parser/SQLParser.g4 | 8 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 11 +-
.../engine/planner/PhysicalPlannerImpl.java | 11 +-
.../engine/planner/global/GlobalPlanner.java | 4 +-
.../planner/physical/BSTIndexScanExec.java | 2 +-
.../planner/physical/ColPartitionStoreExec.java | 2 +-
.../planner/physical/FilterScanIterator.java | 56 +++
.../planner/physical/FullScanIterator.java | 47 +++
.../engine/planner/physical/InsertRowsExec.java | 107 +++++
.../planner/physical/PhysicalPlanUtil.java | 2 +-
.../physical/RangeShuffleFileWriteExec.java | 2 +-
.../engine/planner/physical/ScanIterator.java | 33 ++
.../engine/planner/physical/SeqScanExec.java | 40 +-
.../engine/planner/physical/StoreTableExec.java | 23 +-
.../apache/tajo/engine/query/QueryContext.java | 71 +++-
.../org/apache/tajo/master/GlobalEngine.java | 17 +-
.../java/org/apache/tajo/master/TajoMaster.java | 22 +-
.../tajo/master/TajoMasterClientService.java | 3 +-
.../apache/tajo/master/exec/DDLExecutor.java | 58 ++-
.../exec/NonForwardQueryResultFileScanner.java | 2 +-
.../apache/tajo/master/exec/QueryExecutor.java | 253 ++++++------
.../master/exec/prehook/CreateTableHook.java | 8 +-
.../prehook/DistributedQueryHookManager.java | 2 +-
.../master/exec/prehook/InsertIntoHook.java | 8 +-
.../java/org/apache/tajo/querymaster/Query.java | 53 ++-
.../tajo/querymaster/QueryMasterTask.java | 150 ++-----
.../apache/tajo/querymaster/Repartitioner.java | 95 +++--
.../java/org/apache/tajo/querymaster/Stage.java | 12 +-
.../java/org/apache/tajo/querymaster/Task.java | 56 +--
.../tajo/webapp/QueryExecutorServlet.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 31 +-
.../org/apache/tajo/worker/LegacyTaskImpl.java | 2 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 5 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 2 +-
.../src/main/proto/TajoWorkerProtocol.proto | 16 +-
.../resources/webapps/admin/catalogview.jsp | 3 +-
.../src/main/resources/webapps/admin/index.jsp | 23 +-
.../org/apache/tajo/BackendTestingUtil.java | 2 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 39 +-
.../org/apache/tajo/TajoTestingCluster.java | 35 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +-
.../org/apache/tajo/client/TestTajoClient.java | 18 +-
.../apache/tajo/engine/eval/ExprTestBase.java | 9 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 4 +-
.../engine/planner/TestLogicalOptimizer.java | 5 +-
.../tajo/engine/planner/TestLogicalPlan.java | 3 +-
.../tajo/engine/planner/TestLogicalPlanner.java | 92 +++--
.../tajo/engine/planner/TestPlannerUtil.java | 5 +-
.../planner/physical/TestBNLJoinExec.java | 15 +-
.../planner/physical/TestExternalSortExec.java | 6 +-
.../physical/TestFullOuterHashJoinExec.java | 29 +-
.../physical/TestFullOuterMergeJoinExec.java | 40 +-
.../planner/physical/TestHashAntiJoinExec.java | 15 +-
.../planner/physical/TestHashJoinExec.java | 14 +-
.../planner/physical/TestHashSemiJoinExec.java | 14 +-
.../physical/TestLeftOuterHashJoinExec.java | 31 +-
.../planner/physical/TestMergeJoinExec.java | 10 +-
.../engine/planner/physical/TestNLJoinExec.java | 14 +-
.../planner/physical/TestPhysicalPlanner.java | 57 ++-
.../physical/TestProgressExternalSortExec.java | 6 +-
.../physical/TestRightOuterHashJoinExec.java | 20 +-
.../physical/TestRightOuterMergeJoinExec.java | 36 +-
.../engine/planner/physical/TestSortExec.java | 18 +-
.../apache/tajo/engine/query/TestCTASQuery.java | 8 +-
.../tajo/engine/query/TestCreateTable.java | 8 +-
.../tajo/engine/query/TestHBaseTable.java | 274 +++++++------
.../tajo/engine/query/TestInsertQuery.java | 6 +-
.../apache/tajo/engine/query/TestJoinQuery.java | 6 +-
.../tajo/engine/query/TestTablePartitions.java | 28 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 88 +++--
.../org/apache/tajo/jdbc/TestResultSet.java | 4 +-
.../tajo/master/TestExecutionBlockCursor.java | 12 +-
.../TestNonForwardQueryResultSystemScanner.java | 258 +-----------
.../apache/tajo/master/TestRepartitioner.java | 77 ++--
.../tajo/querymaster/TestIntermediateEntry.java | 24 +-
.../apache/tajo/querymaster/TestKillQuery.java | 10 +-
.../org/apache/tajo/storage/TestRowFile.java | 5 +-
.../results/TestHBaseTable/testCATS.result | 100 -----
.../results/TestHBaseTable/testCTAS.result | 100 +++++
.../testInsertIntoUsingPut.result | 4 +-
.../TestHBaseTable/testInsertValues1.result | 4 +
.../testGetClusterDetails.result | 4 +
.../testGetNextRowsForAggregateFunction.result | 3 +
.../testGetNextRowsForTable.result | 5 +
.../results/TestTajoCli/testDescTable.result | 8 +-
.../testDescTableForNestedSchema.result | 4 +-
.../src/main/conf/storage-site.json.template | 35 ++
tajo-docs/src/main/sphinx/index.rst | 1 +
tajo-docs/src/main/sphinx/storage_plugin.rst | 47 +++
tajo-docs/src/main/sphinx/table_management.rst | 1 +
.../sphinx/table_management/table_overview.rst | 7 +
.../sphinx/table_management/tablespaces.rst | 45 +++
.../java/org/apache/tajo/plan/LogicalPlan.java | 8 +
.../org/apache/tajo/plan/LogicalPlanner.java | 62 ++-
.../tajo/plan/logical/CreateTableNode.java | 44 +--
.../apache/tajo/plan/logical/InsertNode.java | 51 +--
.../plan/logical/PartitionedTableScanNode.java | 2 +-
.../org/apache/tajo/plan/logical/ScanNode.java | 2 +-
.../tajo/plan/logical/StoreTableNode.java | 27 ++
.../rewrite/rules/PartitionedTableRewriter.java | 4 +-
.../plan/serder/LogicalNodeDeserializer.java | 16 +-
.../tajo/plan/serder/LogicalNodeSerializer.java | 25 +-
.../org/apache/tajo/plan/util/PlannerUtil.java | 30 +-
tajo-plan/src/main/proto/Plan.proto | 11 +-
tajo-storage/tajo-storage-common/pom.xml | 12 +-
.../org/apache/tajo/storage/FormatProperty.java | 63 +++
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../apache/tajo/storage/OldStorageManager.java | 234 +++++++++++
.../apache/tajo/storage/StorageProperty.java | 59 ++-
.../org/apache/tajo/storage/StorageUtil.java | 4 +-
.../apache/tajo/storage/TableSpaceManager.java | 238 -----------
.../org/apache/tajo/storage/Tablespace.java | 211 ++++++----
.../apache/tajo/storage/TablespaceManager.java | 390 +++++++++++++++++++
.../src/main/resources/storage-default.json | 20 +
tajo-storage/tajo-storage-hbase/pom.xml | 11 +
.../storage/hbase/AbstractHBaseAppender.java | 2 +-
.../storage/hbase/AddSortForInsertRewriter.java | 92 -----
.../tajo/storage/hbase/ColumnMapping.java | 17 +-
.../tajo/storage/hbase/HBaseFragment.java | 28 +-
.../tajo/storage/hbase/HBasePutAppender.java | 13 +-
.../apache/tajo/storage/hbase/HBaseScanner.java | 9 +-
.../tajo/storage/hbase/HBaseTablespace.java | 248 +++++++-----
.../storage/hbase/SortedInsertRewriter.java | 117 ++++++
.../src/main/proto/StorageFragmentProtos.proto | 15 +-
.../tajo/storage/hbase/TestColumnMapping.java | 2 +-
.../storage/hbase/TestHBaseStorageManager.java | 108 -----
.../tajo/storage/hbase/TestHBaseTableSpace.java | 134 +++++++
tajo-storage/tajo-storage-hdfs/pom.xml | 6 +-
.../org/apache/tajo/storage/FileAppender.java | 28 +-
.../org/apache/tajo/storage/FileTablespace.java | 237 +++++++----
.../tajo/storage/HashShuffleAppender.java | 87 ++---
.../storage/HashShuffleAppenderManager.java | 19 +-
.../tajo/storage/TestCompressionStorages.java | 4 +-
.../tajo/storage/TestDelimitedTextFile.java | 8 +-
.../tajo/storage/TestFileStorageManager.java | 233 -----------
.../apache/tajo/storage/TestFileSystems.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 250 ++++++++++++
.../org/apache/tajo/storage/TestLineReader.java | 8 +-
.../apache/tajo/storage/TestMergeScanner.java | 6 +-
.../org/apache/tajo/storage/TestStorages.java | 48 +--
.../apache/tajo/storage/index/TestBSTIndex.java | 58 +--
.../index/TestSingleCSVFileBSTIndex.java | 4 +-
.../apache/tajo/storage/json/TestJsonSerDe.java | 2 +-
167 files changed, 3860 insertions(+), 2682 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/CHANGES
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 3d09eb1,c6b9b41..d76ec92
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@@ -48,10 -49,7 +48,9 @@@ import org.apache.tajo.plan.LogicalPlan
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.StorageConstants;
- import org.apache.tajo.storage.TableSpaceManager;
- import org.apache.tajo.storage.Tablespace;
++import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
@@@ -1192,11 -1185,21 +1192,10 @@@ public class PhysicalPlannerImpl implem
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
- FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
- List<FileFragment> fragments =
- FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
-
- String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
- FileTablespace sm = (FileTablespace) TablespaceManager.get(fragments.get(0).getPath().toUri()).get();
- String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
- String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
- Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");
-
- TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
- annotation.getSortKeys());
- return new BSTIndexScanExec(ctx, annotation, fragments.get(0), new Path(indexPath, indexName),
- annotation.getKeySchema(), comp, annotation.getDatum());
-
+ FragmentProto [] fragments = ctx.getTables(annotation.getTableName());
-
+ Preconditions.checkState(fragments.length == 1);
+ return new BSTIndexScanExec(ctx, annotation, fragments[0], annotation.getIndexPath(),
+ annotation.getKeySchema(), annotation.getPredicates());
}
public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 34e6f5c,54abca8..521b6b9
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@@ -96,98 -67,12 +96,98 @@@ public class BSTIndexScanExec extends P
this.reader.open();
}
+ private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
+ Schema mergedSchema = new Schema();
+ Set<Column> qualAndTargets = TUtil.newHashSet();
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ for (Target target : targets) {
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
+ }
+ for (Column column : originalSchema.getRootColumns()) {
+ if (subSchema.contains(column)
+ || qualAndTargets.contains(column)
+ || qualAndTargets.contains(column)) {
+ mergedSchema.addColumn(column);
+ }
+ }
+ return mergedSchema;
+ }
+
@Override
public void init() throws IOException {
+ Schema projected;
+
+ // in the case where projected column or expression are given
+ // the target can be an empty list.
+ if (plan.hasTargets()) {
+ projected = new Schema();
+ Set<Column> columnSet = new HashSet<Column>();
+
+ if (plan.hasQual()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ }
+
+ for (Target t : plan.getTargets()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+ }
+
+ for (Column column : inSchema.getAllColumns()) {
+ if (columnSet.contains(column)) {
+ projected.addColumn(column);
+ }
+ }
+
+ } else {
+ // no any projected columns, meaning that all columns should be projected.
+ // TODO - this implicit rule makes code readability bad. So, we should remove it later
+ projected = outSchema;
+ }
+
+ initScanner(projected);
super.init();
progress = 0.0f;
- if (qual != null) {
- qual.bind(context.getEvalContext(), inSchema);
+
+ if (plan.hasQual()) {
+ if (fileScanner.isProjectable()) {
+ qual.bind(context.getEvalContext(), projected);
+ } else {
+ qual.bind(context.getEvalContext(), inSchema);
+ }
+ }
+ }
+
+ private void initScanner(Schema projected) throws IOException {
+
+ TableMeta meta;
+ try {
+ meta = (TableMeta) plan.getTableDesc().getMeta().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+
+ // set system default properties
+ PlannerUtil.applySystemDefaultToTableProperties(context.getQueryContext(), meta);
+
+ // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
+ if (fragment != null) {
+
+ Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual);
+
- this.fileScanner = TableSpaceManager.getStorageManager(context.getConf(),
++ this.fileScanner = OldStorageManager.getStorageManager(context.getConf(),
+ plan.getTableDesc().getMeta().getStoreType())
+ .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema);
+ this.fileScanner.init();
+
+ // See Scanner.isProjectable() method Depending on the result of isProjectable(),
+ // the width of retrieved tuple is changed.
+ //
+ // If TRUE, the retrieved tuple will contain only projected fields.
+ // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
+ if (fileScanner.isProjectable()) {
+ this.projector = new Projector(context, projected, outSchema, plan.getTargets());
+ } else {
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 422d034,37b497c..91fce57
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@@ -43,7 -43,7 +43,6 @@@ import org.apache.tajo.ipc.ClientProtos
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.DDLExecutor;
import org.apache.tajo.master.exec.QueryExecutor;
--import org.apache.tajo.session.Session;
import org.apache.tajo.plan.*;
import org.apache.tajo.plan.logical.InsertNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
@@@ -53,10 -53,8 +52,10 @@@ import org.apache.tajo.plan.verifier.Lo
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.plan.verifier.VerifyException;
- import org.apache.tajo.storage.Tablespace;
- import org.apache.tajo.storage.TableSpaceManager;
++import org.apache.tajo.session.Session;
+ import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.IPCUtil;
import java.io.IOException;
import java.sql.SQLException;
@@@ -95,9 -91,8 +92,9 @@@ public class GlobalEngine extends Abstr
try {
analyzer = new SQLAnalyzer();
preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
- planner = new LogicalPlanner(context.getCatalog());
+ planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(context.getConf());
+ // Access path rewriter is enabled only in QueryMasterTask
+ optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c6732b3,7dbe815..76bfb95
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@@ -835,12 -844,11 +835,11 @@@ public class TajoMasterClientService ex
TableDesc desc;
try {
desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
- meta.getStoreType(), schema,
- meta, path, true, partitionDesc, false);
+ null, meta.getStoreType(), schema, meta, path.toUri(), true, partitionDesc, false);
} catch (Exception e) {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage(e.getMessage()).build();
+ .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null))
+ .build();
}
return TableResponse.newBuilder()
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 862e6f7,5d42157..2e5584c
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -28,11 -28,10 +28,8 @@@ import org.apache.tajo.QueryId
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
--import org.apache.tajo.catalog.CatalogService;
- import org.apache.tajo.catalog.CatalogUtil;
--import org.apache.tajo.catalog.Schema;
--import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.AlreadyExistsIndexException;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@@ -41,12 -40,13 +38,14 @@@ import org.apache.tajo.datum.DatumFacto
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
- import org.apache.tajo.engine.planner.physical.StoreTableExec;
+ import org.apache.tajo.engine.planner.physical.InsertRowsExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
- import org.apache.tajo.master.*;
+ import org.apache.tajo.master.QueryInfo;
+ import org.apache.tajo.master.QueryManager;
+ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
@@@ -65,8 -61,8 +60,9 @@@ import org.apache.tajo.plan.function.py
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
+import org.apache.tajo.util.IPCUtil;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@@ -422,42 -384,89 +391,89 @@@ public class QueryExecutor
fs.rename(eachFile.getPath(), targetFilePath);
}
}
+ }
- if (insertNode.hasTargetTable()) {
- TableStats stats = tableDesc.getStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
-
- CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
- builder.setTableName(tableDesc.getName());
- builder.setStats(stats.getProto());
+ /**
+ * Insert row values
+ */
+ private void insertRowValues(QueryContext queryContext,
+ InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
+ try {
+ String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
+ insertNode.getTableName();
+ String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+ URI finalOutputUri = insertNode.getUri();
+ Tablespace space = TablespaceManager.get(finalOutputUri).get();
+ TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
+ tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());
+
+ FormatProperty formatProperty = space.getFormatProperty(tableMeta);
+
+ TaskAttemptContext taskAttemptContext;
+ if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
+ taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
+ taskAttemptContext.setOutputPath(new Path(finalOutputUri));
+
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
+
+ try {
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
+ }
+ } else {
+ URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
+ Path stagingDir = new Path(stagingSpaceUri);
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- catalog.updateTableStats(builder.build());
+ taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+ taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+ insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
+ }
- responseBuilder.setTableDesc(tableDesc.getProto());
- } else {
+ // set insert stats (how many rows and bytes)
TableStats stats = new TableStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
-
- // Empty TableDesc
- List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
- CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
- .setTableName(nodeUniqName)
- .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
- .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
- .setStats(stats.getProto())
- .build();
-
- responseBuilder.setTableDesc(tableDescProto);
- }
+ stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
+ stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());
+
+ if (insertNode.hasTargetTable()) {
+ CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+ builder.setTableName(insertNode.getTableName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
+
+ TableDesc desc = new TableDesc(
+ insertNode.getTableName(),
+ insertNode.getTargetSchema(),
+ tableMeta,
+ finalOutputUri);
+ responseBuilder.setTableDesc(desc.getProto());
+
+ } else { // If INSERT INTO LOCATION
+
+ // Empty TableDesc
+ List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+ CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+ .setTableName(nodeUniqName)
+ .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
+ .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+ .setStats(stats.getProto())
+ .build();
+
+ responseBuilder.setTableDesc(tableDescProto);
+ }
- // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
- responseBuilder.setMaxRowNum(-1);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResult(IPCUtil.buildOkRequestResult());
+ // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+ responseBuilder.setMaxRowNum(-1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
++ responseBuilder.setResult(IPCUtil.buildOkRequestResult());
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
}
public void executeDistributedQuery(QueryContext queryContext, Session session,
@@@ -502,47 -515,15 +522,34 @@@
}
}
+ private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
+ throws IOException {
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+
+ if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new AlreadyExistsIndexException(qualifiedIndexName);
+ }
+ }
+
- public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+ public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
throws Exception {
- String storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- Tablespace sm = TableSpaceManager.getStorageManager(planner.getConf(), storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (storageProperty.isSortedInsert()) {
- String tableName = PlannerUtil.getStoreTableName(plan);
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
- if (tableDesc == null) {
- throw new VerifyException("Can't get table meta data from catalog: " + tableName);
- }
- List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
- context, tableDesc);
- if (storageSpecifiedRewriteRules != null) {
- for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
- eachRule.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
- }
- }
- }
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+
+ if (tableDesc != null) {
+ Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
+ space.rewritePlan(context, plan);
}
MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index bb83b8c,9d5838d..23fa497
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@@ -30,10 -30,9 +30,11 @@@ import org.apache.hadoop.yarn.state.*
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
+ import org.apache.tajo.QueryVars;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.CatalogException;
import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0737422,1f5e7a3..550b1ee
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -50,15 -40,16 +40,18 @@@ import org.apache.tajo.ipc.TajoWorkerPr
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
+ import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
- import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.session.Session;
-import org.apache.tajo.storage.*;
++import org.apache.tajo.storage.FormatProperty;
+import org.apache.tajo.storage.Tablespace;
- import org.apache.tajo.storage.StorageProperty;
- import org.apache.tajo.storage.StorageUtil;
- import org.apache.tajo.storage.TableSpaceManager;
++import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
@@@ -315,36 -300,22 +302,21 @@@ public class QueryMasterTask extends Co
LOG.warn("Query already started");
return;
}
-
-
+ LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
+
plan = planner.createPlan(queryContext, expr);
+ optimizer.optimize(queryContext, plan);
- String storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- sm = TableSpaceManager.getStorageManager(systemConf, storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (storageProperty.isSortedInsert()) {
- String tableName = PlannerUtil.getStoreTableName(plan);
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
- if (tableDesc == null) {
- throw new VerifyException("Can't get table meta data from catalog: " + tableName);
- }
- List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
- getQueryTaskContext().getQueryContext(), tableDesc);
- if (storageSpecifiedRewriteRules != null) {
- for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
- optimizer.addRuleAfterToJoinOpt(eachRule);
- }
- }
- }
- }
+ // when a given uri is null, TablespaceManager.get will return the default tablespace.
+ space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+ space.rewritePlan(queryContext, plan);
- optimizer.optimize(queryContext, plan);
+ initStagingDir();
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 9118e73,07a09ad..2c336a4
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@@ -44,7 -42,8 +43,6 @@@ import org.apache.tajo.plan.expr.EvalCo
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.serder.EvalNodeDeserializer;
import org.apache.tajo.plan.serder.EvalNodeSerializer;
-import org.apache.tajo.engine.query.QueryContext;
--import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
@@@ -66,9 -66,9 +65,7 @@@ import java.util.TimeZone
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
--import static org.junit.Assert.assertEquals;
--import static org.junit.Assert.assertFalse;
--import static org.junit.Assert.fail;
++import static org.junit.Assert.*;
public class ExprTestBase {
private static TajoTestingCluster util;
@@@ -103,8 -103,8 +100,8 @@@
analyzer = new SQLAnalyzer();
preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
- planner = new LogicalPlanner(cat);
+ planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 133dec1,afa3472..8042aef
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@@ -103,10 -103,10 +103,11 @@@ public class TestLogicalOptimizer
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(util.getConfiguration());
++ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index dbd6e6d,0f37763..0cd5ced
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@@ -881,31 -889,8 +889,31 @@@ public class TestLogicalPlanner
}
@Test
+ public final void testCreateIndexNode() throws PlanningException {
+ QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ Expr expr = sqlAnalyzer.parse(QUERIES[11]);
+ LogicalPlan rootNode = planner.createPlan(qc, expr);
+ LogicalNode plan = rootNode.getRootBlock().getRoot();
+ testJsonSerDerObject(plan);
+
+ LogicalRootNode root = (LogicalRootNode) plan;
+ assertEquals(NodeType.CREATE_INDEX, root.getChild().getType());
+ CreateIndexNode createIndexNode = root.getChild();
+
+ assertEquals(NodeType.PROJECTION, createIndexNode.getChild().getType());
+ ProjectionNode projNode = createIndexNode.getChild();
+
+ assertEquals(NodeType.SELECTION, projNode.getChild().getType());
+ SelectionNode selNode = projNode.getChild();
+
+ assertEquals(NodeType.SCAN, selNode.getChild().getType());
+ ScanNode scanNode = selNode.getChild();
+ assertEquals(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), scanNode.getTableName());
+ }
+
+ @Test
public final void testAsterisk() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[13]);
LogicalPlan planNode = planner.createPlan(qc, expr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 2178d5c,1b64a8f..0684ff6
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@@ -28,14 -28,14 +28,15 @@@ import org.apache.tajo.conf.TajoConf
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
--import org.apache.tajo.engine.planner.*;
++import org.apache.tajo.engine.planner.PhysicalPlanner;
++import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.enforce.Enforcer;
++import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.LogicalNode;
--import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
@@@ -130,8 -128,8 +131,8 @@@ public class TestHashAntiJoinExec
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 697624a,afa273b..cbf0739
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@@ -135,8 -133,8 +135,8 @@@ public class TestHashSemiJoinExec
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8fe03be,dff0cbe..cdfb8cc
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@@ -162,13 -162,12 +162,12 @@@ public class TestPhysicalPlanner
}
appender.flush();
appender.close();
+
+ defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
catalog.createTable(score);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
-
- defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
-
masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
createLargeScoreTable();
@@@ -971,19 -970,14 +970,19 @@@
}
public final String [] createIndexStmt = {
- "create index idx_employee on employee using bst (name null first, empId desc)"
+ "create index idx_employee on employee using TWO_LEVEL_BIN_TREE (name null first, empId desc)"
};
- //@Test
+ @Test
public final void testCreateIndex() throws IOException, PlanningException {
FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
- new Path(employee.getPath()), Integer.MAX_VALUE);
+ new Path(employee.getUri()), Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex");
+ Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee");
+ if (sm.getFileSystem().exists(indexPath)) {
+ sm.getFileSystem().delete(indexPath, true);
+ }
+
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
new FileFragment[] {frags[0]}, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index b54b4d9,4690e71..a19ac34
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@@ -30,11 -30,11 +30,17 @@@ import org.apache.tajo.conf.TajoConf
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
--import org.apache.tajo.engine.planner.*;
++import org.apache.tajo.engine.planner.PhysicalPlanner;
++import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
++import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
++import org.apache.tajo.engine.planner.UniformRangePartition;
import org.apache.tajo.engine.planner.enforce.Enforcer;
--import org.apache.tajo.plan.*;
--import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.engine.query.QueryContext;
++import org.apache.tajo.plan.LogicalOptimizer;
++import org.apache.tajo.plan.LogicalPlan;
++import org.apache.tajo.plan.LogicalPlanner;
++import org.apache.tajo.plan.PlanningException;
++import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
@@@ -101,10 -100,9 +107,10 @@@ public class TestSortExec
tablePath.toUri());
catalog.createTable(desc);
+ queryContext = new QueryContext(conf);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
public static String[] QUERIES = {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index af093a8,7c61cc7..e5b8470
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@@ -80,10 -79,9 +79,9 @@@ public class TestExecutionBlockCursor
}
analyzer = new SQLAnalyzer();
- logicalPlanner = new LogicalPlanner(catalog);
+ logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
- Tablespace sm = TableSpaceManager.getFileStorageManager(conf);
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 10c6836,1351716..e7f51a8
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -37,7 -37,7 +37,10 @@@ import org.apache.tajo.engine.query.Tas
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
--import org.apache.tajo.master.event.*;
++import org.apache.tajo.master.event.QueryEvent;
++import org.apache.tajo.master.event.QueryEventType;
++import org.apache.tajo.master.event.StageEvent;
++import org.apache.tajo.master.event.StageEventType;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@@ -104,8 -105,8 +108,8 @@@ public class TestKillQuery
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@@ -168,8 -169,8 +172,8 @@@
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 48b74c0,a2480c9..d32045f
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@@ -49,7 -48,9 +50,8 @@@ import org.apache.tajo.plan.nameresolve
import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
import org.apache.tajo.plan.util.ExprFinder;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.storage.StorageService;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.StringUtils;
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
index 0000000,ef33a8e..d12c6bd
mode 000000,100644..100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@@ -1,0 -1,250 +1,234 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.storage;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Maps;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.tajo.TaskAttemptId;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.storage.fragment.Fragment;
+
+ import java.io.IOException;
+ import java.lang.reflect.Constructor;
+ import java.net.URI;
+ import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+
+ /**
+ * It handles available table spaces and cache TableSpace instances.
+ */
+ public class OldStorageManager {
+ private static final Log LOG = LogFactory.getLog(OldStorageManager.class);
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Appender>>();
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ Schema.class,
+ TableMeta.class,
+ Fragment.class
+ };
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ TaskAttemptId.class,
+ Schema.class,
+ TableMeta.class,
+ Path.class
+ };
+ /**
+ * Cache of Tablespace.
+ * Key is manager key(warehouse path) + store type
+ */
+ private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ protected static Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+
+ /**
+ * Clear all class cache
+ */
+ @VisibleForTesting
+ protected synchronized static void clearCache() {
+ CONSTRUCTOR_CACHE.clear();
+ SCANNER_HANDLER_CACHE.clear();
+ APPENDER_HANDLER_CACHE.clear();
+ storageManagers.clear();
+ }
+
+ /**
+ * Close Tablespace
+ * @throws java.io.IOException
+ */
+ public static void shutdown() throws IOException {
+ synchronized(storageManagers) {
+ for (Tablespace eachTablespace : storageManagers.values()) {
+ eachTablespace.close();
+ }
+ }
+ clearCache();
+ }
+
+ /**
+ * Returns the proper Tablespace instance according to the storeType.
+ *
+ * @param tajoConf Tajo system property.
+ * @param storeType Storage type
+ * @return
+ * @throws IOException
+ */
+ public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+ FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+ if (fileSystem != null) {
+ return getStorageManager(tajoConf, fileSystem.getUri(), storeType);
+ } else {
+ return getStorageManager(tajoConf, null, storeType);
+ }
+ }
+
+ /**
+ * Returns the proper Tablespace instance according to the storeType
+ *
+ * @param tajoConf Tajo system property.
+ * @param uri Key that can identify each storage manager(may be a path)
+ * @param storeType Storage type
+ * @return
+ * @throws IOException
+ */
+ public static synchronized Tablespace getStorageManager(
+ TajoConf tajoConf, URI uri, String storeType) throws IOException {
+ Preconditions.checkNotNull(tajoConf);
+ Preconditions.checkNotNull(uri);
+ Preconditions.checkNotNull(storeType);
+
+ String typeName;
+ if (storeType.equalsIgnoreCase("HBASE")) {
+ typeName = "hbase";
+ } else {
+ typeName = "hdfs";
+ }
+
+ synchronized (storageManagers) {
+ String storeKey = typeName + "_" + uri.toString();
+ Tablespace manager = storageManagers.get(storeKey);
+
+ if (manager == null) {
+ Class<? extends Tablespace> storageManagerClass =
+ tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+
+ if (storageManagerClass == null) {
+ throw new IOException("Unknown Storage Type: " + typeName);
+ }
+
+ try {
+ Constructor<? extends Tablespace> constructor =
+ (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+ if (constructor == null) {
+ constructor = storageManagerClass.getDeclaredConstructor(TablespaceManager.TABLESPACE_PARAM);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+ }
+ manager = constructor.newInstance(new Object[]{"noname", uri});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ manager.init(tajoConf);
+ storageManagers.put(storeKey, manager);
+ }
+
+ return manager;
+ }
+ }
+
+ /**
- * Returns Scanner instance.
- *
- * @param conf The system property
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @param target The output schema
- * @return Scanner instance
- * @throws IOException
- */
- public static synchronized SeekableScanner getSeekableScanner(
- TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
- }
-
- /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param schema Input schema
+ * @param meta Table meta data
+ * @param fragment The fragment for scanning
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param taskAttemptId Task id
+ * @param meta Table meta data
+ * @param schema Input schema
+ * @param workDir Working directory
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+ TableMeta meta, Schema schema, Path workDir) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index c89f043,52e223d..0995e91
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@@ -245,22 -261,15 +261,30 @@@ public abstract class Tablespace
return scanner;
}
+ public Appender getAppenderForInsertRow(OverridableConf queryContext,
+ TaskAttemptId taskAttemptId,
+ TableMeta meta,
+ Schema schema,
+ Path workDir) throws IOException {
+ return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
+ }
+
/**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws IOException
+ */
+ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
+ Schema target) throws IOException {
+ return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+ }
+
+ /**
* Returns Appender instance.
* @param queryContext Query property.
* @param taskAttemptId Task id.
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 845c2d7,18bb7ed..bcb02c6
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@@ -49,13 -49,9 +49,10 @@@ import org.apache.tajo.plan.logical.Cre
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
- import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
- import org.apache.tajo.util.Bytes;
- import org.apache.tajo.util.BytesUtils;
- import org.apache.tajo.util.Pair;
- import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.util.*;
import java.io.BufferedReader;
import java.io.IOException;
@@@ -1062,13 -1089,10 +1090,10 @@@ public class HBaseTablespace extends Ta
}
}
- public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
- if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
- List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>();
- rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
- return rules;
- } else {
- return null;
+ @Override
+ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
- if (REWRITE_RULE.isEligible(context, plan)) {
- REWRITE_RULE.rewrite(context, plan);
++ if (REWRITE_RULE.isEligible(new LogicalPlanRewriteRuleContext(context, plan))) {
++ REWRITE_RULE.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
index 0000000,ebf557e..b6143f3
mode 000000,100644..100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
@@@ -1,0 -1,116 +1,117 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.storage.hbase;
+
-import org.apache.tajo.OverridableConf;
+ import org.apache.tajo.catalog.Column;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.SortSpec;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.PlanningException;
+ import org.apache.tajo.plan.logical.*;
+ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
+ import org.apache.tajo.util.KeyValueSet;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+
+ /**
+ * This rewrite rule injects a sort operation to preserve the writing rows in
+ * an ascending order of HBase row keys, required by HFile.
+ */
+ public class SortedInsertRewriter implements LogicalPlanRewriteRule {
+
+ @Override
+ public String getName() {
+ return "SortedInsertRewriter";
+ }
+
+ @Override
- public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
- boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
++ public boolean isEligible(LogicalPlanRewriteRuleContext context) {
++ boolean hbaseMode = "false".equalsIgnoreCase(context.getQueryContext().get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
++ LogicalRootNode rootNode = context.getPlan().getRootBlock().getRoot();
+ LogicalNode node = rootNode.getChild();
+ return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT;
+ }
+
+ public static Column[] getIndexColumns(Schema tableSchema, KeyValueSet tableProperty) throws IOException {
+ List<Column> indexColumns = new ArrayList<Column>();
+
+ ColumnMapping columnMapping = new ColumnMapping(tableSchema, tableProperty);
+
+ boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+ for (int i = 0; i < isRowKeys.length; i++) {
+ if (isRowKeys[i]) {
+ indexColumns.add(tableSchema.getColumn(i));
+ }
+ }
+
+ return indexColumns.toArray(new Column[]{});
+ }
+
+ @Override
- public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
++ public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws PlanningException {
++ LogicalPlan plan = context.getPlan();
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ StoreTableNode storeTable = rootNode.getChild();
+ Schema tableSchema = storeTable.getTableSchema();
+
+ Column[] sortColumns;
+ try {
+ sortColumns = getIndexColumns(tableSchema, storeTable.getOptions());
+ } catch (IOException e) {
+ throw new PlanningException(e);
+ }
+
+ int[] sortColumnIndexes = new int[sortColumns.length];
+ for (int i = 0; i < sortColumns.length; i++) {
+ sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+ }
+
+ UnaryNode insertNode = rootNode.getChild();
+ LogicalNode childNode = insertNode.getChild();
+
+ Schema sortSchema = childNode.getOutSchema();
+ SortNode sortNode = plan.createNode(SortNode.class);
+ sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+ sortNode.setInSchema(sortSchema);
+ sortNode.setOutSchema(sortSchema);
+
+ SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+ int index = 0;
+
+ for (int i = 0; i < sortColumnIndexes.length; i++) {
+ Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+ if (sortColumn == null) {
+ throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+ }
+ sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+ }
+ sortNode.setSortSpecs(sortSpecs);
+
+ sortNode.setChild(insertNode.getChild());
+ insertNode.setChild(sortNode);
+ plan.getRootBlock().registerNode(sortNode);
+
+ return plan;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/17dfe86c/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 2fab8db,22fb607..f0a302a
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@@ -124,8 -125,7 +125,8 @@@ public class TestBSTIndex
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -148,8 -148,7 +149,8 @@@
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
reader.open();
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@@ -228,8 -227,7 +229,8 @@@
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@@ -292,8 -290,7 +293,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -364,8 -362,7 +366,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -388,8 -385,7 +390,8 @@@
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;
@@@ -457,8 -453,7 +459,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -481,8 -476,7 +483,8 @@@
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;
@@@ -538,8 -533,7 +541,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -564,8 -558,7 +567,8 @@@
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
tuple.put(0, DatumFactory.createInt8(0));
@@@ -623,8 -617,7 +627,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -729,8 -723,7 +734,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -811,8 -805,7 +817,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -837,8 -830,7 +843,8 @@@
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
keySchema, comp);
reader.open();
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@@ -903,8 -895,7 +909,8 @@@
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
@@@ -932,8 -923,7 +938,8 @@@
assertEquals(keySchema, reader.getKeySchema());
assertEquals(comp, reader.getComparator());
- scanner = TableSpaceManager.getStorageManager(conf, meta.getStoreType()).
- scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
++ scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+ getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;