You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:37 UTC
[40/50] [abbrv] git commit: Solving conflicts
Solving conflicts
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2495601c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2495601c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2495601c
Branch: refs/heads/DAG-execplan
Commit: 2495601c3c4b6407293339d5ea0aaabd6a8d8aba
Parents: dc24dbc 3a5a617
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 26 18:08:45 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 26 18:08:45 2013 +0900
----------------------------------------------------------------------
BUILDING.txt | 2 +-
CHANGES.txt | 149 +++-
.../org/apache/tajo/algebra/Aggregation.java | 6 +-
.../org/apache/tajo/algebra/BooleanLiteral.java | 32 +
.../java/org/apache/tajo/algebra/CastExpr.java | 6 +-
.../org/apache/tajo/algebra/CreateTable.java | 34 +-
.../java/org/apache/tajo/algebra/DataType.java | 68 --
.../org/apache/tajo/algebra/DataTypeExpr.java | 68 ++
.../java/org/apache/tajo/algebra/DateValue.java | 63 ++
.../java/org/apache/tajo/algebra/DropTable.java | 8 +-
.../org/apache/tajo/algebra/LiteralValue.java | 12 +-
.../org/apache/tajo/algebra/NullLiteral.java | 31 +
.../java/org/apache/tajo/algebra/NullValue.java | 31 -
.../java/org/apache/tajo/algebra/OpType.java | 50 +-
.../org/apache/tajo/algebra/Projection.java | 6 +-
.../org/apache/tajo/algebra/SignedExpr.java | 38 +
.../java/org/apache/tajo/algebra/Target.java | 67 --
.../org/apache/tajo/algebra/TargetExpr.java | 67 ++
.../org/apache/tajo/algebra/TimeLiteral.java | 51 ++
.../java/org/apache/tajo/algebra/TimeValue.java | 78 ++
.../apache/tajo/algebra/TimestampLiteral.java | 57 ++
.../java/org/apache/tajo/algebra/TestExpr.java | 8 +-
tajo-catalog/pom.xml | 5 +
.../tajo/catalog/AbstractCatalogClient.java | 2 +-
.../org/apache/tajo/catalog/DDLBuilder.java | 20 +-
.../org/apache/tajo/catalog/FunctionDesc.java | 6 +-
.../java/org/apache/tajo/catalog/Schema.java | 57 +-
.../java/org/apache/tajo/catalog/TableDesc.java | 30 +-
.../tajo/catalog/partition/PartitionDesc.java | 194 +++++
.../tajo/catalog/partition/Partitions.java | 349 --------
.../tajo/catalog/statistics/ColumnStats.java | 5 +-
.../tajo/catalog/statistics/StatisticsUtil.java | 5 +-
.../tajo/catalog/statistics/TupleUtil.java | 63 --
.../src/main/proto/CatalogProtos.proto | 11 +-
.../org/apache/tajo/catalog/TestSchema.java | 26 +
tajo-catalog/tajo-catalog-drivers/pom.xml | 78 ++
.../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 477 +++++++++++
.../tajo/catalog/store/HCatalogStore.java | 412 +++++++++
.../apache/tajo/catalog/store/HCatalogUtil.java | 169 ++++
.../tajo/catalog/store/DummyListener.java | 97 +++
.../tajo/catalog/store/TestHCatalogStore.java | 297 +++++++
tajo-catalog/tajo-catalog-server/pom.xml | 128 ---
.../org/apache/tajo/catalog/CatalogServer.java | 2 +-
.../tajo/catalog/store/AbstractDBStore.java | 42 +-
.../apache/tajo/catalog/store/DerbyStore.java | 42 +-
.../tajo/catalog/store/HCatalogStore.java | 396 ---------
.../apache/tajo/catalog/store/HCatalogUtil.java | 169 ----
.../org/apache/tajo/catalog/TestCatalog.java | 72 +-
.../org/apache/tajo/catalog/TestDBStore.java | 62 +-
tajo-common/pom.xml | 6 +
.../java/org/apache/tajo/TajoConstants.java | 2 +-
.../java/org/apache/tajo/conf/TajoConf.java | 11 +-
.../java/org/apache/tajo/datum/BitDatum.java | 6 +-
.../java/org/apache/tajo/datum/BlobDatum.java | 6 +-
.../org/apache/tajo/datum/BooleanDatum.java | 193 +++--
.../java/org/apache/tajo/datum/CharDatum.java | 8 +-
.../java/org/apache/tajo/datum/DateDatum.java | 170 ++++
.../main/java/org/apache/tajo/datum/Datum.java | 177 ++--
.../org/apache/tajo/datum/DatumFactory.java | 191 ++++-
.../java/org/apache/tajo/datum/Float4Datum.java | 6 +-
.../java/org/apache/tajo/datum/Float8Datum.java | 27 +-
.../java/org/apache/tajo/datum/Inet4Datum.java | 45 +-
.../java/org/apache/tajo/datum/Int2Datum.java | 18 +-
.../java/org/apache/tajo/datum/Int4Datum.java | 16 +-
.../java/org/apache/tajo/datum/Int8Datum.java | 7 +-
.../java/org/apache/tajo/datum/NullDatum.java | 79 +-
.../org/apache/tajo/datum/NumericDatum.java | 18 +-
.../java/org/apache/tajo/datum/TextDatum.java | 14 +-
.../java/org/apache/tajo/datum/TimeDatum.java | 163 ++++
.../org/apache/tajo/datum/TimestampDatum.java | 156 ++++
.../apache/tajo/json/ClassNameDeserializer.java | 44 -
.../java/org/apache/tajo/json/DatumAdapter.java | 33 +-
.../java/org/apache/tajo/json/GsonObject.java | 2 +-
.../org/apache/tajo/json/PathDeserializer.java | 40 -
.../org/apache/tajo/json/PathSerializer.java | 3 +-
.../main/java/org/apache/tajo/util/TUtil.java | 2 +-
tajo-common/src/main/proto/Security.proto | 6 +
tajo-common/src/main/proto/yarn_protos.proto | 177 ++--
.../org/apache/tajo/datum/TestBoolDatum.java | 106 ++-
.../org/apache/tajo/datum/TestDateDatum.java | 111 +++
.../org/apache/tajo/datum/TestInet4Datum.java | 16 +
.../org/apache/tajo/datum/TestTimeDatum.java | 111 +++
.../apache/tajo/datum/TestTimestampDatum.java | 113 +++
.../benchmark/tpch/customer.schema | 2 +-
tajo-core/tajo-core-backend/pom.xml | 17 +-
.../org/apache/tajo/engine/parser/SQLLexer.g4 | 10 +-
.../org/apache/tajo/engine/parser/SQLParser.g4 | 123 ++-
.../src/main/java/log4j.properties | 6 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 40 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 55 +-
.../org/apache/tajo/client/QueryStatus.java | 6 -
.../java/org/apache/tajo/client/TajoClient.java | 38 +-
.../apache/tajo/engine/eval/AlgebraicUtil.java | 134 ++-
.../tajo/engine/eval/BasicEvalNodeVisitor.java | 25 +
.../org/apache/tajo/engine/eval/BinaryEval.java | 235 +++---
.../org/apache/tajo/engine/eval/CastEval.java | 37 +-
.../org/apache/tajo/engine/eval/EvalNode.java | 2 +-
.../tajo/engine/eval/EvalNodeVisitor2.java | 4 +
.../apache/tajo/engine/eval/EvalTreeUtil.java | 62 +-
.../org/apache/tajo/engine/eval/EvalType.java | 1 +
.../org/apache/tajo/engine/eval/FieldEval.java | 2 +-
.../org/apache/tajo/engine/eval/InEval.java | 18 +-
.../tajo/engine/eval/LikePredicateEval.java | 4 +
.../engine/eval/PatternMatchPredicateEval.java | 16 +-
.../org/apache/tajo/engine/eval/SignedEval.java | 114 +++
.../apache/tajo/engine/function/InCountry.java | 3 +-
.../function/datetime/ToCharTimestamp.java | 68 ++
.../engine/function/datetime/ToTimestamp.java | 44 +
.../tajo/engine/function/math/AbsDouble.java | 50 ++
.../tajo/engine/function/math/AbsFloat.java | 50 ++
.../tajo/engine/function/math/AbsInt.java | 50 ++
.../tajo/engine/function/math/AbsLong.java | 50 ++
.../apache/tajo/engine/function/math/Cbrt.java | 50 ++
.../tajo/engine/function/math/Degrees.java | 50 ++
.../apache/tajo/engine/function/math/Div.java | 63 ++
.../apache/tajo/engine/function/math/Exp.java | 50 ++
.../apache/tajo/engine/function/math/Mod.java | 63 ++
.../apache/tajo/engine/function/math/Pi.java | 45 +
.../apache/tajo/engine/function/math/Pow.java | 52 ++
.../tajo/engine/function/math/Radians.java | 50 ++
.../apache/tajo/engine/function/math/Round.java | 12 +-
.../apache/tajo/engine/function/math/Sign.java | 50 ++
.../apache/tajo/engine/function/math/Sqrt.java | 50 ++
.../tajo/engine/function/string/Ascii.java | 57 ++
.../apache/tajo/engine/function/string/Chr.java | 54 ++
.../tajo/engine/function/string/Decode.java | 79 ++
.../tajo/engine/function/string/Encode.java | 81 ++
.../function/string/HexStringConverter.java | 65 ++
.../tajo/engine/function/string/Locate.java | 94 +++
.../tajo/engine/function/string/Lpad.java | 78 ++
.../tajo/engine/function/string/QuoteIdent.java | 54 ++
.../tajo/engine/function/string/Rpad.java | 77 ++
.../tajo/engine/function/string/SplitPart.java | 15 +-
.../tajo/engine/function/string/Substr.java | 37 +-
.../tajo/engine/function/string/ToBin.java | 54 ++
.../tajo/engine/parser/HiveConverter.java | 16 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 224 +++--
.../tajo/engine/planner/AlgebraVisitor.java | 65 +-
.../tajo/engine/planner/BaseAlgebraVisitor.java | 611 +++++++++++---
.../engine/planner/BasicLogicalPlanVisitor.java | 14 +-
.../tajo/engine/planner/LogicalOptimizer.java | 60 +-
.../apache/tajo/engine/planner/LogicalPlan.java | 17 +-
.../tajo/engine/planner/LogicalPlanVisitor.java | 2 +
.../tajo/engine/planner/LogicalPlanner.java | 272 ++++--
.../engine/planner/PhysicalPlannerImpl.java | 93 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 73 +-
.../org/apache/tajo/engine/planner/Target.java | 4 +
.../tajo/engine/planner/global/DataChannel.java | 83 +-
.../engine/planner/global/ExecutionBlock.java | 5 +-
.../planner/global/ExecutionBlockCursor.java | 34 +-
.../planner/global/ExecutionBlockPID.java | 43 +
.../engine/planner/global/ExecutionPlan.java | 34 +-
.../engine/planner/global/GlobalPlanner.java | 637 +++++++-------
.../tajo/engine/planner/global/MasterPlan.java | 1 +
.../engine/planner/logical/CreateTableNode.java | 20 +-
.../engine/planner/logical/DropTableNode.java | 15 +-
.../tajo/engine/planner/logical/LimitNode.java | 2 +-
.../engine/planner/logical/LogicalNode.java | 4 +
.../tajo/engine/planner/logical/NodeType.java | 1 +
.../logical/PartitionedTableScanNode.java | 180 ++++
.../engine/planner/logical/RelationNode.java | 2 +-
.../tajo/engine/planner/logical/ScanNode.java | 19 +-
.../engine/planner/logical/StoreTableNode.java | 24 +-
.../planner/logical/TableSubQueryNode.java | 4 +-
.../planner/logical/join/FoundJoinOrder.java | 15 +-
.../join/GreedyHeuristicJoinOrderAlgorithm.java | 378 ++++-----
.../engine/planner/logical/join/JoinEdge.java | 17 +-
.../engine/planner/logical/join/JoinGraph.java | 39 +-
.../logical/join/JoinOrderAlgorithm.java | 16 +-
.../ColumnPartitionedTableStoreExec.java | 210 +++++
.../planner/physical/HashFullOuterJoinExec.java | 2 +-
.../planner/physical/HashLeftOuterJoinExec.java | 2 +-
.../planner/physical/IndexedStoreExec.java | 4 +-
.../planner/physical/NLLeftOuterJoinExec.java | 2 +-
.../planner/physical/PartitionedStoreExec.java | 4 +-
.../engine/planner/physical/PhysicalExec.java | 6 +-
.../engine/planner/physical/SeqScanExec.java | 71 +-
.../planner/rewrite/FilterPushDownRule.java | 47 +-
.../rewrite/PartitionedTableRewriter.java | 371 ++++++++
.../planner/rewrite/ProjectionPushDownRule.java | 32 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 106 +++
.../org/apache/tajo/jdbc/TajoResultSet.java | 4 +-
.../tajo/master/AbstractTaskScheduler.java | 2 +-
.../tajo/master/DefaultTaskScheduler.java | 19 +-
.../org/apache/tajo/master/GlobalEngine.java | 68 +-
.../apache/tajo/master/TajoAsyncDispatcher.java | 6 +-
.../apache/tajo/master/TajoContainerProxy.java | 4 -
.../java/org/apache/tajo/master/TajoMaster.java | 306 ++++++-
.../tajo/master/TajoMasterClientService.java | 51 +-
.../apache/tajo/master/TajoMasterService.java | 30 +-
.../apache/tajo/master/YarnContainerProxy.java | 68 +-
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 2 +-
.../event/QueryDiagnosticsUpdateEvent.java | 2 +-
.../tajo/master/event/QueryEventType.java | 1 -
.../event/SubQueryDiagnosticsUpdateEvent.java | 34 +
.../tajo/master/event/SubQueryEventType.java | 2 +
.../tajo/master/event/TaskFatalErrorEvent.java | 12 +-
.../master/metrics/CatalogMetricsGaugeSet.java | 54 ++
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 ++
.../apache/tajo/master/querymaster/Query.java | 96 ++-
.../master/querymaster/QueryInProgress.java | 6 +-
.../master/querymaster/QueryJobManager.java | 4 +-
.../tajo/master/querymaster/QueryMaster.java | 74 +-
.../querymaster/QueryMasterManagerService.java | 5 +-
.../master/querymaster/QueryMasterRunner.java | 2 +-
.../master/querymaster/QueryMasterTask.java | 30 +-
.../master/querymaster/QueryUnitAttempt.java | 25 +-
.../tajo/master/querymaster/Repartitioner.java | 2 +-
.../tajo/master/querymaster/SubQuery.java | 301 +++++--
.../tajo/master/querymaster/SubQueryState.java | 3 +-
.../tajo/master/rm/TajoWorkerContainer.java | 26 +-
.../tajo/master/rm/TajoWorkerContainerId.java | 5 +
.../master/rm/TajoWorkerResourceManager.java | 8 +-
.../master/rm/YarnRMContainerAllocator.java | 31 +-
.../tajo/master/rm/YarnTajoResourceManager.java | 63 +-
.../apache/tajo/util/ApplicationIdUtils.java | 2 +-
.../util/metrics/GroupNameMetricsFilter.java | 43 +
.../tajo/util/metrics/LogEventGaugeSet.java | 64 ++
.../tajo/util/metrics/MetricsFilterList.java | 43 +
.../tajo/util/metrics/RegexpMetricsFilter.java | 51 ++
.../tajo/util/metrics/TajoLogEventCounter.java | 86 ++
.../apache/tajo/util/metrics/TajoMetrics.java | 133 +++
.../tajo/util/metrics/TajoSystemMetrics.java | 213 +++++
.../util/metrics/reporter/GangliaReporter.java | 258 ++++++
.../reporter/MetricsConsoleReporter.java | 80 ++
.../MetricsConsoleScheduledReporter.java | 32 +
.../reporter/MetricsFileScheduledReporter.java | 57 ++
.../MetricsStreamScheduledReporter.java | 179 ++++
.../util/metrics/reporter/NullReporter.java | 31 +
.../metrics/reporter/TajoMetricsReporter.java | 232 +++++
.../reporter/TajoMetricsScheduledReporter.java | 206 +++++
.../tajo/webapp/QueryExecutorServlet.java | 17 +-
.../tajo/worker/AbstractResourceAllocator.java | 2 +-
.../org/apache/tajo/worker/DeletionService.java | 115 +++
.../tajo/worker/TajoResourceAllocator.java | 8 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 98 ++-
.../tajo/worker/TajoWorkerClientService.java | 13 +-
.../tajo/worker/TajoWorkerManagerService.java | 11 +-
.../main/java/org/apache/tajo/worker/Task.java | 6 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 4 +-
.../apache/tajo/worker/TaskRunnerManager.java | 6 +-
.../tajo/worker/YarnResourceAllocator.java | 7 +-
.../src/main/proto/ClientProtos.proto | 8 +-
.../src/main/proto/InternalTypes.proto | 2 +-
.../main/proto/TajoMasterClientProtocol.proto | 3 +-
.../src/main/proto/TajoMasterProtocol.proto | 12 +-
.../src/main/proto/TajoWorkerProtocol.proto | 4 +-
.../src/main/resources/tajo-metrics.properties | 75 ++
.../org/apache/tajo/BackendTestingUtil.java | 3 -
.../org/apache/tajo/MiniTajoYarnCluster.java | 10 +-
.../org/apache/tajo/TajoTestingCluster.java | 3 +-
.../test/java/org/apache/tajo/TestTajoIds.java | 2 +-
.../org/apache/tajo/client/TestTajoClient.java | 119 ++-
.../apache/tajo/engine/eval/ExprTestBase.java | 8 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 24 +-
.../tajo/engine/eval/TestSQLDateTimeTypes.java | 38 +
.../tajo/engine/eval/TestSQLExpression.java | 115 +++
.../engine/function/TestDateTimeFunctions.java | 50 ++
.../tajo/engine/function/TestMathFunctions.java | 289 ++++++-
.../TestStringOperatorsAndFunctions.java | 208 ++++-
.../tajo/engine/parser/TestHiveConverter.java | 12 +-
.../tajo/engine/parser/TestSQLAnalyzer.java | 7 +-
.../engine/planner/TestLogicalOptimizer.java | 2 +-
.../tajo/engine/planner/TestLogicalPlan.java | 2 +-
.../planner/physical/TestBSTIndexExec.java | 2 +-
.../planner/physical/TestHashAntiJoinExec.java | 2 +-
.../planner/physical/TestHashSemiJoinExec.java | 2 +-
.../planner/physical/TestPhysicalPlanner.java | 16 +-
.../engine/planner/physical/TestSortExec.java | 2 +-
.../tajo/engine/query/TestCaseByCases.java | 82 ++
.../engine/query/TestCreateTableStatement.java | 57 ++
.../tajo/engine/query/TestGroupByQuery.java | 28 +
.../tajo/engine/query/TestInsertQuery.java | 10 +-
.../apache/tajo/engine/query/TestJoinQuery.java | 74 +-
.../tajo/engine/query/TestSelectQuery.java | 21 +-
.../tajo/engine/query/TestTablePartitions.java | 360 ++++++++
.../tajo/engine/query/TestTableSubQuery.java | 23 +
.../apache/tajo/engine/util/TestTupleUtil.java | 52 +-
.../tajo/master/TestExecutionBlockCursor.java | 8 +-
.../apache/tajo/master/TestGlobalPlanner.java | 2 +-
.../tajo/util/metrics/TestMetricsFilter.java | 52 ++
.../tajo/util/metrics/TestSystemMetrics.java | 133 +++
.../tajo/worker/TestRangeRetrieverHandler.java | 17 +-
.../src/test/queries/complex_union_1.sql | 4 +-
.../create_table_partition_by_column.sql | 2 +-
.../test/queries/create_table_various_types.sql | 48 ++
.../src/test/queries/tajo415_case.sql | 33 +
.../src/test/queries/tajo418_case.sql | 29 +
.../src/test/tpch/customer.tbl | 2 +
.../tajo/pullserver/FadvisedChunkedFile.java | 6 +-
.../tajo/pullserver/FadvisedFileRegion.java | 6 +-
.../tajo/pullserver/PullServerAuxService.java | 22 +-
.../tajo/pullserver/TajoPullServerService.java | 2 +-
.../tajo/storage/AbstractStorageManager.java | 18 +-
.../storage/BinarySerializeDeserialize.java | 256 ------
.../storage/BinarySerializerDeserializer.java | 257 ++++++
.../java/org/apache/tajo/storage/CSVFile.java | 82 +-
.../tajo/storage/CompressedSplitLineReader.java | 182 ++++
.../org/apache/tajo/storage/FileScanner.java | 24 +
.../java/org/apache/tajo/storage/LazyTuple.java | 10 +-
.../org/apache/tajo/storage/LineReader.java | 10 +-
.../java/org/apache/tajo/storage/RawFile.java | 85 +-
.../org/apache/tajo/storage/RowStoreUtil.java | 33 +-
.../tajo/storage/SerializeDeserialize.java | 34 -
.../tajo/storage/SerializerDeserializer.java | 34 +
.../apache/tajo/storage/SplitLineReader.java | 39 +
.../tajo/storage/TextSerializeDeserialize.java | 194 -----
.../storage/TextSerializerDeserializer.java | 209 +++++
.../org/apache/tajo/storage/rcfile/RCFile.java | 12 +-
.../apache/tajo/storage/v2/RCFileScanner.java | 11 +-
.../tajo/storage/TestCompressionStorages.java | 2 +-
.../org/apache/tajo/storage/TestLazyTuple.java | 22 +-
.../org/apache/tajo/storage/TestStorages.java | 4 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 845 ++++++-------------
tajo-dist/pom.xml | 2 -
tajo-dist/src/main/bin/tajo | 24 +
tajo-dist/src/main/conf/tajo-env.sh | 3 +
tajo-project/pom.xml | 18 +-
.../src/site/markdown/tajo-0.2.0-doc.md | 2 +-
.../src/site/markdown/tajo-0.8.0-doc.md | 66 +-
320 files changed, 15486 insertions(+), 5160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index b10418c,5bba89b..bfc1d16
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@@ -47,7 -46,7 +47,6 @@@ public class LogicalPlan
/** it indicates the root block */
public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
-- private int nextPid = 0;
private Integer noNameBlockId = 0;
private Integer noNameColumnId = 0;
@@@ -60,6 -59,6 +59,16 @@@
/** planning and optimization log */
private List<String> planingHistory = Lists.newArrayList();
++ private PIDFactory pidFactory = new PIDFactory();
++
++ public static class PIDFactory {
++ private int nextPid = 0;
++
++ public int newPID() {
++ return nextPid++;
++ }
++ }
++
public LogicalPlan(LogicalPlanner planner) {
this.planner = planner;
}
@@@ -76,8 -75,8 +85,12 @@@
return block;
}
++ public PIDFactory getPidFactory() {
++ return pidFactory;
++ }
++
public int newPID() {
-- return nextPid++;
++ return pidFactory.newPID();
}
public QueryBlock newNoNameBlock() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 87daa8f,5120106..c3cf697
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@@ -73,58 -70,43 +73,86 @@@ public class PhysicalPlannerImpl implem
PhysicalExec execPlan;
try {
- execPlan = createPlanRecursive(context, logicalPlan);
- if (execPlan instanceof StoreTableExec
- || execPlan instanceof IndexedStoreExec
- || execPlan instanceof PartitionedStoreExec
- || execPlan instanceof ColumnPartitionedTableStoreExec) {
- return execPlan;
- } else if (context.getDataChannel() != null) {
- return buildOutputOperator(context, logicalPlan, execPlan);
- } else {
- return execPlan;
- }
+ plan = checkOutputOperator(context, plan);
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
++//=======
++// execPlan = createPlanRecursive(context, logicalPlan);
++// if (execPlan instanceof StoreTableExec
++// || execPlan instanceof IndexedStoreExec
++// || execPlan instanceof PartitionedStoreExec
++// || execPlan instanceof ColumnPartitionedTableStoreExec) {
++// return execPlan;
++// } else if (context.getDataChannel() != null) {
++// return buildOutputOperator(context, logicalPlan, execPlan);
++// } else {
++// return execPlan;
++// }
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
} catch (IOException ioe) {
throw new InternalException(ioe);
}
}
- private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
- PhysicalExec execPlan) throws IOException {
- DataChannel channel = context.getDataChannel();
- StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
- if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
- storeTableNode.setInSchema(plan.getOutSchema());
- storeTableNode.setOutSchema(plan.getOutSchema());
- if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
- storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
- } else {
- storeTableNode.setDefaultParition();
- }
- storeTableNode.setChild(plan);
+ @VisibleForTesting
+ public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
+ throws InternalException {
+ PhysicalExec execPlan;
- PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
- return outExecPlan;
+ try {
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
+ } catch (IOException ioe) {
+ throw new InternalException(ioe);
+ }
+ }
++//=======
++// private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
++// PhysicalExec execPlan) throws IOException {
++// DataChannel channel = context.getDataChannel();
++// StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
++// if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
++// storeTableNode.setInSchema(plan.getOutSchema());
++// storeTableNode.setOutSchema(plan.getOutSchema());
++// if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
++// storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
++// } else {
++// storeTableNode.setDefaultParition();
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
++// }
++// }
+
+ private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
+ LogicalNode root = plan.getTerminalNode();
+ List<DataChannel> channels = context.getOutgoingChannels();
+ for (DataChannel channel : channels) {
- LogicalNode node = plan.getRootChild(channel.getSrcPID());
++ LogicalNode node = plan.getTopNodeFromPID(channel.getSrcPID());
+ if (node.getType() != NodeType.STORE) {
+ StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
+ storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+ storeTableNode.setInSchema(channel.getSchema());
+ storeTableNode.setOutSchema(channel.getSchema());
+ if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+ storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
+ } else {
+ storeTableNode.setDefaultParition();
+ }
+
+ plan.remove(node, root);
+ plan.add(node, storeTableNode, Tag.SINGLE);
+ plan.add(storeTableNode, root, Tag.SINGLE);
+ channel.updateSrcPID(storeTableNode.getPID());
+ }
+ }
+ return plan;
}
- private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+ private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
+ PhysicalExec currentExec;
switch (logicalNode.getType()) {
@@@ -167,20 -135,14 +195,22 @@@
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
- leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
- return leftExec;
+ leftExec = createPlanRecursive(ctx, plan, subQueryNode.getSubQuery());
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
- } case SCAN:
+ }
+ case PARTITIONS_SCAN:
+ case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
- return leftExec;
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
@@@ -361,11 -292,10 +391,15 @@@
}
}
- private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
+ List<LogicalNode> childs = plan.getChilds(node);
- String [] leftLineage = PlannerUtil.getLineage(plan, childs.get(0));
- String [] rightLineage = PlannerUtil.getLineage(plan, childs.get(1));
++ String [] leftLineage = PlannerUtil.getRelationLineage(plan, childs.get(0));
++ String [] rightLineage = PlannerUtil.getRelationLineage(plan, childs.get(1));
++//=======
++// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
@@@ -435,9 -365,9 +469,12 @@@
}
}
- private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
- String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++ String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
@@@ -456,13 -386,13 +493,16 @@@
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
- String [] outerLineage4 = PlannerUtil.getLineage(plan.getChild(join, 0));
- String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
++ String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++//=======
++// String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize = estimateSizeRecursive(context, outerLineage4);
if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
- LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
} else {
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
@@@ -524,11 -454,11 +564,15 @@@
}
}
- private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
- String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
- String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++ String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++ String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
@@@ -562,10 -492,10 +606,14 @@@
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
- private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
- String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
- String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++ String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++ String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
final long threshold = 1048576 * 128;
@@@ -782,7 -719,7 +843,10 @@@
return createInMemoryHashAggregation(context, groupbyNode, subOp);
}
- String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(groupbyNode).get(0));
- String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
++ String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(groupbyNode).get(0));
++//=======
++// String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
@@@ -809,12 -746,12 +873,15 @@@
}
}
- return createBestSortPlan(context, sortNode, child);
+ return createBestSortPlan(context, plan, sortNode, child);
}
- public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+ public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
- String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(sortNode).get(0));
- String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
++ String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(sortNode).get(0));
++//=======
++// String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = 1048576 * 2000;
@@@ -834,12 -771,11 +901,12 @@@
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
- FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+ FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
List<FileFragment> fragments =
- FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
- FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
++ FragmentConvertor.convert(ctx.getConf(), ctx.getIncomingChannels().get(0).getStoreType(), fragmentProtos);
- String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
+ String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
+ annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3b8bd08,d541680..e9bb071
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@@ -29,9 -28,8 +28,10 @@@ import org.apache.tajo.catalog.Schema
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.logical.*;
+ import org.apache.tajo.engine.exception.InvalidQueryException;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.logical.*;
- import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.storage.TupleComparator;
import java.util.*;
@@@ -63,16 -65,40 +67,51 @@@ public class PlannerUtil
return tableNames;
}
- public static String [] getLineage(ExecutionPlan plan, LogicalNode node) {
++ public static String [] getRelationLineage(ExecutionPlan plan, LogicalNode node) {
+ LogicalNode [] scans = PlannerUtil.findAllNodes(plan, node, NodeType.SCAN);
+ String [] tableNames = new String[scans.length];
+ ScanNode scan;
+ for (int i = 0; i < scans.length; i++) {
+ scan = (ScanNode) scans[i];
+ tableNames[i] = scan.getCanonicalName();
+ }
+ return tableNames;
+ }
++
+ /**
+ * Get all scan nodes from a logical operator tree within a query block
+ *
+ * @param node a start node
+ * @return an array of relation names
+ */
+ public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode node)
+ throws PlanningException {
+ RelationFinderVisitor visitor = new RelationFinderVisitor();
+ visitor.visit(null, plan, node);
+ return visitor.getFoundRelations();
+ }
+
+ public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
+ private Set<String> foundRelNameSet = Sets.newHashSet();
+
+ public Set<String> getFoundRelations() {
+ return foundRelNameSet;
+ }
+
+ @Override
+ public LogicalNode visitChild(Object context, LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack)
+ throws PlanningException {
+ if (node.getType() != NodeType.TABLE_SUBQUERY) {
+ super.visitChild(context, plan, node, stack);
+ }
+
+ if (node instanceof RelationNode) {
+ foundRelNameSet.add(((RelationNode) node).getCanonicalName());
+ }
+
+ return node;
+ }
+ }
/**
* Delete the logical node from a plan.
@@@ -344,21 -279,6 +383,21 @@@
return child;
}
+ public static SortNode[] transformSortTo2p(LogicalPlan plan, SortNode sort) {
+ Preconditions.checkArgument(sort != null);
+ SortNode parent = null, child = null;
+ try {
+ parent = sort;
+ child = (SortNode) sort.clone();
+ child.setPid(plan.newPID());
+ } catch (CloneNotSupportedException e) {
- LOG.warn(e);
++ throw new RuntimeException(e);
+ }
+
+ parent.setInSchema(child.getOutSchema());
+
+ return new SortNode[]{parent, child};
+ }
/**
* Find the top logical node matched to type from the given node
@@@ -866,9 -677,11 +905,11 @@@
return copy;
}
- public static <T extends LogicalNode> T clone(LogicalNode node) {
- public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
++ public static <T extends LogicalNode> T clone(PIDFactory pidFactory, LogicalNode node) {
try {
- return (T) node.clone();
+ T copy = (T) node.clone();
- copy.setPID(plan.newPID());
++ copy.setPID(pidFactory.newPID());
+ return copy;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 26cedd7,556c7ff..034d71f
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@@ -27,8 -27,8 +27,10 @@@ import static org.apache.tajo.catalog.p
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
public class DataChannel {
-- private ExecutionBlockId srcId;
-- private ExecutionBlockId targetId;
++// private ExecutionBlockId srcId;
++// private ExecutionBlockId targetId;
++ private ExecutionBlockPID srcId;
++ private ExecutionBlockPID targetId;
private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
private PartitionType partitionType;
private Integer partitionNum = 1;
@@@ -36,21 -36,15 +38,20 @@@
private Schema schema;
- private StoreType storeType = StoreType.CSV;
-
- private Integer srcPID;
- private Integer targetPID;
+ private StoreType storeType = StoreType.RAW;
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
- this.srcId = srcId;
- this.targetId = targetId;
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
- this.srcId = srcId;
- this.targetId = targetId;
- this.srcPID = srcPID;
- this.targetPID = targetPID;
++// this.srcId = srcId;
++// this.targetId = targetId;
++ this.srcId = new ExecutionBlockPID(srcId, srcPID);
++ this.targetId = new ExecutionBlockPID(targetId, targetPID);
++// this.srcPID = srcPID;
++// this.targetPID = targetPID;
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
- this(srcId, targetId);
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+ PartitionType partitionType) {
+ this(srcId, targetId, srcPID, targetPID);
this.partitionType = partitionType;
}
@@@ -67,8 -59,8 +68,8 @@@
}
public DataChannel(DataChannelProto proto) {
-- this.srcId = new ExecutionBlockId(proto.getSrcId());
-- this.targetId = new ExecutionBlockId(proto.getTargetId());
++// this.srcId = new ExecutionBlockId(proto.getSrcId());
++// this.targetId = new ExecutionBlockId(proto.getTargetId());
this.transmitType = proto.getTransmitType();
this.partitionType = proto.getPartitionType();
if (proto.hasSchema()) {
@@@ -85,20 -77,18 +86,37 @@@
if (proto.hasPartitionNum()) {
this.partitionNum = proto.getPartitionNum();
}
- if (proto.hasSrcPID()) {
- this.srcPID = proto.getSrcPID();
- }
- if (proto.hasTargetPID()) {
- this.targetPID = proto.getTargetPID();
++// if (proto.hasSrcPID()) {
++// this.srcPID = proto.getSrcPID();
++// }
++// if (proto.hasTargetPID()) {
++// this.targetPID = proto.getTargetPID();
++// }
++
++ Integer srcPID = proto.hasSrcPID() ? proto.getSrcPID() : null;
++ Integer targetPID = proto.hasTargetPID() ? proto.getTargetPID() : null;
++ this.srcId = new ExecutionBlockPID(new ExecutionBlockId(proto.getSrcId()), srcPID);
++ this.targetId = new ExecutionBlockPID(new ExecutionBlockId(proto.getTargetId()), targetPID);
+
+ if (proto.hasStoreType()) {
+ this.storeType = proto.getStoreType();
}
}
public ExecutionBlockId getSrcId() {
-- return srcId;
++ return srcId.getExecutionBlockId();
}
public ExecutionBlockId getTargetId() {
-- return targetId;
++ return targetId.getExecutionBlockId();
++ }
++
++ public ExecutionBlockPID getSrcExecutionPID() {
++ return srcId;
++ }
++
++ public ExecutionBlockPID getTargetExecutionPID() {
++ return srcId;
}
public PartitionType getPartitionType() {
@@@ -160,8 -150,8 +178,8 @@@
public DataChannelProto getProto() {
DataChannelProto.Builder builder = DataChannelProto.newBuilder();
-- builder.setSrcId(srcId.getProto());
-- builder.setTargetId(targetId.getProto());
++ builder.setSrcId(srcId.getExecutionBlockId().getProto());
++ builder.setTargetId(targetId.getExecutionBlockId().getProto());
if (transmitType != null) {
builder.setTransmitType(transmitType);
}
@@@ -177,11 -167,9 +195,15 @@@
if (partitionNum != null) {
builder.setPartitionNum(partitionNum);
}
- if (srcPID != null) {
- builder.setSrcPID(srcPID);
++ if (srcId.getPid() != null) {
++ builder.setSrcPID(srcId.getPid());
+ }
- if (targetPID != null) {
- builder.setTargetPID(targetPID);
++ if (targetId.getPid() != null) {
++ builder.setTargetPID(targetId.getPid());
++ }
+
+ if(storeType != null){
+ builder.setStoreType(storeType);
}
return builder.build();
}
@@@ -196,8 -184,8 +218,9 @@@
public String toString() {
StringBuilder sb = new StringBuilder();
-- sb.append("[").append(srcId.getQueryId()).append("] ");
- sb.append(srcId.getId()).append("."+srcPID).append(" => ").append(targetId.getId()).append("."+targetPID);
- sb.append(srcId.getId()).append(" => ").append(targetId.getId());
++ sb.append("[").append(srcId.getExecutionBlockId().getQueryId()).append("] ");
++ sb.append(srcId.getExecutionBlockId().getId()).append("."+srcId.getPid()).append(" => ")
++ .append(targetId.getExecutionBlockId().getId()).append("."+targetId.getPid());
sb.append(" (type=").append(partitionType);
if (hasPartitionKey()) {
sb.append(", key=");
@@@ -215,16 -203,4 +238,16 @@@
sb.append(")");
return sb.toString();
}
+
+ public void updateSrcPID(int srcPID) {
- this.srcPID = srcPID;
++ this.srcId.updatePid(srcPID);
+ }
+
+ public Integer getSrcPID() {
- return srcPID;
++ return srcId.getPid();
+ }
+
+ public Integer getTargetPID() {
- return targetPID;
++ return targetId.getPid();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 7cfa478,4f3976e..f964302
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@@ -14,15 -14,11 +14,16 @@@
package org.apache.tajo.engine.planner.global;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tajo.ExecutionBlockId;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@@ -33,17 -29,16 +34,17 @@@
*/
public class ExecutionBlock {
private ExecutionBlockId executionBlockId;
- private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+ private ExecutionPlan executionPlan;
private Enforcer enforcer = new Enforcer();
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
-
private Set<String> broadcasted = new HashSet<String>();
- public ExecutionBlock(ExecutionBlockId executionBlockId, LogicalRootNode rootNode) {
++ public ExecutionBlock(ExecutionBlockId executionBlockId, PIDFactory pidFactory, LogicalRootNode rootNode) {
+ this.executionBlockId = executionBlockId;
- this.executionPlan = new ExecutionPlan(rootNode);
++ this.executionPlan = new ExecutionPlan(pidFactory, rootNode);
+ }
+
+ @VisibleForTesting
public ExecutionBlock(ExecutionBlockId executionBlockId) {
this.executionBlockId = executionBlockId;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
index 0000000,0000000..d120057
new file mode 100644
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
@@@ -1,0 -1,0 +1,43 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.engine.planner.global;
++
++import org.apache.tajo.ExecutionBlockId;
++
++public class ExecutionBlockPID {
++ private ExecutionBlockId executionBlockId;
++ private Integer pid;
++
++ public ExecutionBlockPID(ExecutionBlockId executionBlockId, Integer pid) {
++ this.executionBlockId = executionBlockId;
++ this.pid = pid;
++ }
++
++ public ExecutionBlockId getExecutionBlockId() {
++ return executionBlockId;
++ }
++
++ public void updatePid(Integer pid) {
++ this.pid = pid;
++ }
++
++ public Integer getPid() {
++ return pid;
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index 1c02d70,0000000..ee42eef
mode 100644,000000..100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@@ -1,381 -1,0 +1,393 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.json.CoreGsonHelper;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.json.GsonObject;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * ExecutionPlan is a DAG of logical nodes.
+ * If there are two input sources, the plan should include Join or Union.
+ * The terminalNode is used as the start position of the traversal, because there are multiple output destinations.
+ */
+public class ExecutionPlan implements GsonObject {
++ @Expose private PIDFactory pidFactory;
+ @Expose private InputContext inputContext;
+ @Expose private boolean hasUnionPlan;
+ @Expose private boolean hasJoinPlan;
+ @Expose private LogicalRootNode terminalNode;
+ @Expose private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+ @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
+ = new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
+
+ @VisibleForTesting
- public ExecutionPlan() {
-
++ public ExecutionPlan(PIDFactory pidFactory) {
++ this.pidFactory = pidFactory;
+ }
+
- public ExecutionPlan(LogicalRootNode terminalNode) {
- this.terminalNode = PlannerUtil.clone(terminalNode);
++ public ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
++ this(pidFactory);
++ this.terminalNode = PlannerUtil.clone(pidFactory, terminalNode);
+ }
+
+ public void setPlan(LogicalNode plan) {
+ this.clear();
+ this.addPlan(plan);
+ }
+
+ private void clear() {
+ for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
+ graph.removeEdge(edge.getChildId(), edge.getParentId());
+ }
+ vertices.clear();
+ this.inputContext = null;
+ this.hasUnionPlan = false;
+ this.hasJoinPlan = false;
+ }
+
+ public void addPlan(LogicalNode plan) {
- LogicalNode current = PlannerUtil.clone(plan);
++ LogicalNode current = PlannerUtil.clone(pidFactory, plan);
+ if (current.getType() == NodeType.ROOT) {
+ terminalNode = (LogicalRootNode) current;
+ } else {
+ this.add(current, terminalNode, Tag.SINGLE);
+ terminalNode.setChild(current);
+ }
+ ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
+ builder.visit(terminalNode);
+ }
+
+ public void add(LogicalNode child, LogicalNode parent, Tag tag) {
+ vertices.put(child.getPID(), child);
+ vertices.put(parent.getPID(), parent);
+ graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, tag));
+ }
+
+ public void setInputContext(InputContext contexts) {
+ this.inputContext = contexts;
+ }
+
+ public boolean hasJoinPlan() {
+ return this.hasJoinPlan;
+ }
+
+ public boolean hasUnionPlan() {
+ return this.hasUnionPlan;
+ }
+
+ public LogicalRootNode getTerminalNode() {
+ return terminalNode;
+ }
+
+ public InputContext getInputContext() {
+ return inputContext;
+ }
+
+ public String toString() {
+ return graph.toStringGraph(terminalNode.getPID());
+ }
+
+ public Tag getTag(LogicalNode child, LogicalNode parent) {
+ return graph.getEdge(child.getPID(), parent.getPID()).getTag();
+ }
+
+ public LogicalNode getChild(LogicalNode parent, Tag tag) {
+ List<ExecutionPlanEdge> incomingEdges = graph.getIncomingEdges(parent.getPID());
+ for (ExecutionPlanEdge inEdge : incomingEdges) {
+ if (inEdge.getTag() == tag) {
+ return vertices.get(inEdge.getChildId());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, ExecutionPlan.class);
+ }
+
+ public Schema getOutSchema(int i) {
+ return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ExecutionPlan) {
+ ExecutionPlan other = (ExecutionPlan) o;
+ boolean eq = this.hasJoinPlan == other.hasJoinPlan;
+ eq &= this.hasUnionPlan == other.hasUnionPlan;
+ eq &= this.terminalNode.equals(other.terminalNode);
+ eq &= this.inputContext.equals(other.inputContext);
+ if (!eq) {
+ return false;
+ }
+
+ ExecutionPlanComparator comparator = new ExecutionPlanComparator(this, other);
+ eq &= comparator.compare();
+ return eq;
+ }
+ return false;
+ }
+
- public LogicalNode getRootChild(int pid) {
++ public LogicalNode getTopNode(int index) {
++ return vertices.get(getTopNodePid(index));
++ }
++
++ public int getTopNodePid(int index) {
++ return graph.getChild(terminalNode.getPID(), index);
++ }
++
++ public LogicalNode getTopNodeFromPID(int pid) {
+ for (Integer childId : graph.getChilds(terminalNode.getPID())) {
+ if (childId == pid) {
+ return vertices.get(childId);
+ }
+ }
+ return null;
+ }
+
+ public int getChildCount(LogicalNode node) {
+ return graph.getChildCount(node.getPID());
+ }
+
+ public LogicalNode getChild(LogicalNode node, int i) {
+ return vertices.get(graph.getChild(node.getPID(), i));
+ }
+
+ public int getParentCount(LogicalNode node) {
+ return graph.getParentCount(node.getPID());
+ }
+
+ public LogicalNode getParent(LogicalNode node, int i) {
+ return vertices.get(graph.getParent(node.getPID(), i));
+ }
+
+ public List<LogicalNode> getChilds(LogicalNode node) {
+ List<LogicalNode> childs = new ArrayList<LogicalNode>();
+ for (Integer childId : graph.getChilds(node.getPID())) {
+ childs.add(vertices.get(childId));
+ }
+ return childs;
+ }
+
+ public void remove(LogicalNode child, LogicalNode parent) {
+ this.graph.removeEdge(child.getPID(), parent.getPID());
+ }
+
+ private static class LogicalNodeIdAndTag {
+ @Expose int id;
+ @Expose Tag tag;
+
+ public LogicalNodeIdAndTag(int id, Tag tag) {
+ this.id = id;
+ this.tag = tag;
+ }
+ }
+
+ public static class ExecutionPlanJsonHelper implements GsonObject {
+ @Expose private final boolean hasJoinPlan;
+ @Expose private final boolean hasUnionPlan;
+ @Expose private final InputContext inputContext;
+ @Expose private final LogicalRootNode terminalNode;
+ @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+ @Expose Map<Integer, List<LogicalNodeIdAndTag>> adjacentList = new HashMap<Integer, List<LogicalNodeIdAndTag>>();
+
+ public ExecutionPlanJsonHelper(ExecutionPlan plan) {
+ this.hasJoinPlan = plan.hasJoinPlan;
+ this.hasUnionPlan = plan.hasUnionPlan;
+ this.inputContext = plan.getInputContext();
+ this.terminalNode = plan.terminalNode;
+ this.vertices.putAll(plan.vertices);
+ Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
+ int parentId, childId;
+ List<LogicalNodeIdAndTag> adjacents;
+
+ // convert the graph to an adjacent list
+ for (ExecutionPlanEdge edge : edges) {
+ childId = edge.getChildId();
+ parentId = edge.getParentId();
+
+ if (adjacentList.containsKey(childId)) {
+ adjacents = adjacentList.get(childId);
+ } else {
+ adjacents = new ArrayList<LogicalNodeIdAndTag>();
+ adjacentList.put(childId, adjacents);
+ }
+ adjacents.add(new LogicalNodeIdAndTag(parentId, edge.getTag()));
+ }
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, ExecutionPlanJsonHelper.class);
+ }
+
+ public ExecutionPlan toExecutionPlan() {
- ExecutionPlan plan = new ExecutionPlan(this.terminalNode);
++ // TODO: check that it works
++ ExecutionPlan plan = new ExecutionPlan(null, this.terminalNode);
+ plan.hasJoinPlan = this.hasJoinPlan;
+ plan.hasUnionPlan = this.hasUnionPlan;
+ plan.setInputContext(this.inputContext);
+ plan.vertices.putAll(this.vertices);
+
+ for (Entry<Integer, List<LogicalNodeIdAndTag>> e : this.adjacentList.entrySet()) {
+ LogicalNode child = this.vertices.get(e.getKey());
+ for (LogicalNodeIdAndTag idAndTag : e.getValue()) {
+ plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
+ }
+ }
+
+ return plan;
+ }
+ }
+
+ private static class ExecutionPlanComparator {
+ ExecutionPlan plan1;
+ ExecutionPlan plan2;
+ boolean equal = true;
+
+ public ExecutionPlanComparator(ExecutionPlan plan1, ExecutionPlan plan2) {
+ this.plan1 = plan1;
+ this.plan2 = plan2;
+ }
+
+ public boolean compare() {
+ Stack<Integer> s1 = new Stack<Integer>();
+ Stack<Integer> s2 = new Stack<Integer>();
+ s1.push(plan1.terminalNode.getPID());
+ s2.push(plan2.terminalNode.getPID());
+ return recursiveCompare(s1, s2);
+ }
+
+ private boolean recursiveCompare(Stack<Integer> s1, Stack<Integer> s2) {
+ Integer l1 = s1.pop();
+ Integer l2 = s2.pop();
+
+ if (l1.equals(l2)) {
+ if (plan1.graph.getChildCount(l1) == plan2.graph.getChildCount(l2)) {
+ if (plan1.graph.getChildCount(l1) > 0
+ && plan2.graph.getChildCount(l2) > 0) {
+ for (Integer child : plan1.graph.getChilds(l1)) {
+ s1.push(child);
+ }
+ for (Integer child : plan2.graph.getChilds(l2)) {
+ s2.push(child);
+ }
+ } else {
+ equal &= true;
+ return recursiveCompare(s1, s2);
+ }
+ } else {
+ equal = false;
+ }
+ } else {
+ equal = false;
+ }
+ return equal;
+ }
+ }
+
+ private static class ExecutionPlanBuilder implements LogicalNodeVisitor {
+ private ExecutionPlan plan;
+
+ public ExecutionPlanBuilder(ExecutionPlan plan) {
+ this.plan = plan;
+ }
+
+ @Override
+ public void visit(LogicalNode current) {
+ try {
+ Preconditions.checkArgument(current instanceof UnaryNode, "The current node should be an unary node");
+ visit(current, Tag.SINGLE);
+ } catch (PlanningException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void visit(LogicalNode current, Tag tag) throws PlanningException {
+ if (current instanceof UnaryNode) {
+ visitUnary((UnaryNode) current, tag);
+ } else if (current instanceof BinaryNode) {
+ visitBinary((BinaryNode) current, tag);
+ } else if (current instanceof ScanNode) {
+ visitScan((ScanNode) current, tag);
+ } else if (current instanceof TableSubQueryNode) {
+ visitTableSubQuery((TableSubQueryNode) current, tag);
+ }
+ }
+
+ private void visitScan(ScanNode node, Tag tag) throws PlanningException {
+ if (plan.inputContext == null) {
+ plan.inputContext = new InputContext();
+ }
+ plan.inputContext.addScanNode(node);
+ }
+
+ private void visitUnary(UnaryNode node, Tag tag) throws PlanningException {
+ if (node.getChild() != null) {
- LogicalNode child = PlannerUtil.clone(node.getChild());
++ LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getChild());
+ plan.add(child, node, tag);
+ node.setChild(null);
+ visit(child, tag);
+ }
+ }
+
+ private void visitBinary(BinaryNode node, Tag tag) throws PlanningException {
+ Preconditions.checkArgument(tag == Tag.SINGLE);
+
+ LogicalNode child;
+ if (node.getType() == NodeType.JOIN) {
+ plan.hasJoinPlan = true;
+ } else if (node.getType() == NodeType.UNION) {
+ plan.hasUnionPlan = true;
+ }
+ if (node.getLeftChild() != null) {
- child = PlannerUtil.clone(node.getLeftChild());
++ child = PlannerUtil.clone(plan.pidFactory, node.getLeftChild());
+ plan.add(child, node, Tag.LEFT);
+ node.setLeftChild(null);
+ visit(child, Tag.LEFT);
+ }
+ if (node.getRightChild() != null) {
- child = PlannerUtil.clone(node.getRightChild());
++ child = PlannerUtil.clone(plan.pidFactory, node.getRightChild());
+ plan.add(child, node, Tag.RIGHT);
+ node.setRightChild(null);
+ visit(child, Tag.RIGHT);
+ }
+ }
+
+ private void visitTableSubQuery(TableSubQueryNode node, Tag tag) throws PlanningException {
- LogicalNode child = PlannerUtil.clone(node.getSubQuery());
++ LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getSubQuery());
+ plan.add(child, node, tag);
+ visit(child, tag);
+ }
+ }
+}