You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:37 UTC

[40/50] [abbrv] git commit: Solving conflicts

Solving conflicts


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2495601c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2495601c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2495601c

Branch: refs/heads/DAG-execplan
Commit: 2495601c3c4b6407293339d5ea0aaabd6a8d8aba
Parents: dc24dbc 3a5a617
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 26 18:08:45 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 26 18:08:45 2013 +0900

----------------------------------------------------------------------
 BUILDING.txt                                    |   2 +-
 CHANGES.txt                                     | 149 +++-
 .../org/apache/tajo/algebra/Aggregation.java    |   6 +-
 .../org/apache/tajo/algebra/BooleanLiteral.java |  32 +
 .../java/org/apache/tajo/algebra/CastExpr.java  |   6 +-
 .../org/apache/tajo/algebra/CreateTable.java    |  34 +-
 .../java/org/apache/tajo/algebra/DataType.java  |  68 --
 .../org/apache/tajo/algebra/DataTypeExpr.java   |  68 ++
 .../java/org/apache/tajo/algebra/DateValue.java |  63 ++
 .../java/org/apache/tajo/algebra/DropTable.java |   8 +-
 .../org/apache/tajo/algebra/LiteralValue.java   |  12 +-
 .../org/apache/tajo/algebra/NullLiteral.java    |  31 +
 .../java/org/apache/tajo/algebra/NullValue.java |  31 -
 .../java/org/apache/tajo/algebra/OpType.java    |  50 +-
 .../org/apache/tajo/algebra/Projection.java     |   6 +-
 .../org/apache/tajo/algebra/SignedExpr.java     |  38 +
 .../java/org/apache/tajo/algebra/Target.java    |  67 --
 .../org/apache/tajo/algebra/TargetExpr.java     |  67 ++
 .../org/apache/tajo/algebra/TimeLiteral.java    |  51 ++
 .../java/org/apache/tajo/algebra/TimeValue.java |  78 ++
 .../apache/tajo/algebra/TimestampLiteral.java   |  57 ++
 .../java/org/apache/tajo/algebra/TestExpr.java  |   8 +-
 tajo-catalog/pom.xml                            |   5 +
 .../tajo/catalog/AbstractCatalogClient.java     |   2 +-
 .../org/apache/tajo/catalog/DDLBuilder.java     |  20 +-
 .../org/apache/tajo/catalog/FunctionDesc.java   |   6 +-
 .../java/org/apache/tajo/catalog/Schema.java    |  57 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |  30 +-
 .../tajo/catalog/partition/PartitionDesc.java   | 194 +++++
 .../tajo/catalog/partition/Partitions.java      | 349 --------
 .../tajo/catalog/statistics/ColumnStats.java    |   5 +-
 .../tajo/catalog/statistics/StatisticsUtil.java |   5 +-
 .../tajo/catalog/statistics/TupleUtil.java      |  63 --
 .../src/main/proto/CatalogProtos.proto          |  11 +-
 .../org/apache/tajo/catalog/TestSchema.java     |  26 +
 tajo-catalog/tajo-catalog-drivers/pom.xml       |  78 ++
 .../tajo-catalog-drivers/tajo-hcatalog/pom.xml  | 477 +++++++++++
 .../tajo/catalog/store/HCatalogStore.java       | 412 +++++++++
 .../apache/tajo/catalog/store/HCatalogUtil.java | 169 ++++
 .../tajo/catalog/store/DummyListener.java       |  97 +++
 .../tajo/catalog/store/TestHCatalogStore.java   | 297 +++++++
 tajo-catalog/tajo-catalog-server/pom.xml        | 128 ---
 .../org/apache/tajo/catalog/CatalogServer.java  |   2 +-
 .../tajo/catalog/store/AbstractDBStore.java     |  42 +-
 .../apache/tajo/catalog/store/DerbyStore.java   |  42 +-
 .../tajo/catalog/store/HCatalogStore.java       | 396 ---------
 .../apache/tajo/catalog/store/HCatalogUtil.java | 169 ----
 .../org/apache/tajo/catalog/TestCatalog.java    |  72 +-
 .../org/apache/tajo/catalog/TestDBStore.java    |  62 +-
 tajo-common/pom.xml                             |   6 +
 .../java/org/apache/tajo/TajoConstants.java     |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  11 +-
 .../java/org/apache/tajo/datum/BitDatum.java    |   6 +-
 .../java/org/apache/tajo/datum/BlobDatum.java   |   6 +-
 .../org/apache/tajo/datum/BooleanDatum.java     | 193 +++--
 .../java/org/apache/tajo/datum/CharDatum.java   |   8 +-
 .../java/org/apache/tajo/datum/DateDatum.java   | 170 ++++
 .../main/java/org/apache/tajo/datum/Datum.java  | 177 ++--
 .../org/apache/tajo/datum/DatumFactory.java     | 191 ++++-
 .../java/org/apache/tajo/datum/Float4Datum.java |   6 +-
 .../java/org/apache/tajo/datum/Float8Datum.java |  27 +-
 .../java/org/apache/tajo/datum/Inet4Datum.java  |  45 +-
 .../java/org/apache/tajo/datum/Int2Datum.java   |  18 +-
 .../java/org/apache/tajo/datum/Int4Datum.java   |  16 +-
 .../java/org/apache/tajo/datum/Int8Datum.java   |   7 +-
 .../java/org/apache/tajo/datum/NullDatum.java   |  79 +-
 .../org/apache/tajo/datum/NumericDatum.java     |  18 +-
 .../java/org/apache/tajo/datum/TextDatum.java   |  14 +-
 .../java/org/apache/tajo/datum/TimeDatum.java   | 163 ++++
 .../org/apache/tajo/datum/TimestampDatum.java   | 156 ++++
 .../apache/tajo/json/ClassNameDeserializer.java |  44 -
 .../java/org/apache/tajo/json/DatumAdapter.java |  33 +-
 .../java/org/apache/tajo/json/GsonObject.java   |   2 +-
 .../org/apache/tajo/json/PathDeserializer.java  |  40 -
 .../org/apache/tajo/json/PathSerializer.java    |   3 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |   2 +-
 tajo-common/src/main/proto/Security.proto       |   6 +
 tajo-common/src/main/proto/yarn_protos.proto    | 177 ++--
 .../org/apache/tajo/datum/TestBoolDatum.java    | 106 ++-
 .../org/apache/tajo/datum/TestDateDatum.java    | 111 +++
 .../org/apache/tajo/datum/TestInet4Datum.java   |  16 +
 .../org/apache/tajo/datum/TestTimeDatum.java    | 111 +++
 .../apache/tajo/datum/TestTimestampDatum.java   | 113 +++
 .../benchmark/tpch/customer.schema              |   2 +-
 tajo-core/tajo-core-backend/pom.xml             |  17 +-
 .../org/apache/tajo/engine/parser/SQLLexer.g4   |  10 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  | 123 ++-
 .../src/main/java/log4j.properties              |   6 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  40 +-
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  55 +-
 .../org/apache/tajo/client/QueryStatus.java     |   6 -
 .../java/org/apache/tajo/client/TajoClient.java |  38 +-
 .../apache/tajo/engine/eval/AlgebraicUtil.java  | 134 ++-
 .../tajo/engine/eval/BasicEvalNodeVisitor.java  |  25 +
 .../org/apache/tajo/engine/eval/BinaryEval.java | 235 +++---
 .../org/apache/tajo/engine/eval/CastEval.java   |  37 +-
 .../org/apache/tajo/engine/eval/EvalNode.java   |   2 +-
 .../tajo/engine/eval/EvalNodeVisitor2.java      |   4 +
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |  62 +-
 .../org/apache/tajo/engine/eval/EvalType.java   |   1 +
 .../org/apache/tajo/engine/eval/FieldEval.java  |   2 +-
 .../org/apache/tajo/engine/eval/InEval.java     |  18 +-
 .../tajo/engine/eval/LikePredicateEval.java     |   4 +
 .../engine/eval/PatternMatchPredicateEval.java  |  16 +-
 .../org/apache/tajo/engine/eval/SignedEval.java | 114 +++
 .../apache/tajo/engine/function/InCountry.java  |   3 +-
 .../function/datetime/ToCharTimestamp.java      |  68 ++
 .../engine/function/datetime/ToTimestamp.java   |  44 +
 .../tajo/engine/function/math/AbsDouble.java    |  50 ++
 .../tajo/engine/function/math/AbsFloat.java     |  50 ++
 .../tajo/engine/function/math/AbsInt.java       |  50 ++
 .../tajo/engine/function/math/AbsLong.java      |  50 ++
 .../apache/tajo/engine/function/math/Cbrt.java  |  50 ++
 .../tajo/engine/function/math/Degrees.java      |  50 ++
 .../apache/tajo/engine/function/math/Div.java   |  63 ++
 .../apache/tajo/engine/function/math/Exp.java   |  50 ++
 .../apache/tajo/engine/function/math/Mod.java   |  63 ++
 .../apache/tajo/engine/function/math/Pi.java    |  45 +
 .../apache/tajo/engine/function/math/Pow.java   |  52 ++
 .../tajo/engine/function/math/Radians.java      |  50 ++
 .../apache/tajo/engine/function/math/Round.java |  12 +-
 .../apache/tajo/engine/function/math/Sign.java  |  50 ++
 .../apache/tajo/engine/function/math/Sqrt.java  |  50 ++
 .../tajo/engine/function/string/Ascii.java      |  57 ++
 .../apache/tajo/engine/function/string/Chr.java |  54 ++
 .../tajo/engine/function/string/Decode.java     |  79 ++
 .../tajo/engine/function/string/Encode.java     |  81 ++
 .../function/string/HexStringConverter.java     |  65 ++
 .../tajo/engine/function/string/Locate.java     |  94 +++
 .../tajo/engine/function/string/Lpad.java       |  78 ++
 .../tajo/engine/function/string/QuoteIdent.java |  54 ++
 .../tajo/engine/function/string/Rpad.java       |  77 ++
 .../tajo/engine/function/string/SplitPart.java  |  15 +-
 .../tajo/engine/function/string/Substr.java     |  37 +-
 .../tajo/engine/function/string/ToBin.java      |  54 ++
 .../tajo/engine/parser/HiveConverter.java       |  16 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  | 224 +++--
 .../tajo/engine/planner/AlgebraVisitor.java     |  65 +-
 .../tajo/engine/planner/BaseAlgebraVisitor.java | 611 +++++++++++---
 .../engine/planner/BasicLogicalPlanVisitor.java |  14 +-
 .../tajo/engine/planner/LogicalOptimizer.java   |  60 +-
 .../apache/tajo/engine/planner/LogicalPlan.java |  17 +-
 .../tajo/engine/planner/LogicalPlanVisitor.java |   2 +
 .../tajo/engine/planner/LogicalPlanner.java     | 272 ++++--
 .../engine/planner/PhysicalPlannerImpl.java     |  93 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  73 +-
 .../org/apache/tajo/engine/planner/Target.java  |   4 +
 .../tajo/engine/planner/global/DataChannel.java |  83 +-
 .../engine/planner/global/ExecutionBlock.java   |   5 +-
 .../planner/global/ExecutionBlockCursor.java    |  34 +-
 .../planner/global/ExecutionBlockPID.java       |  43 +
 .../engine/planner/global/ExecutionPlan.java    |  34 +-
 .../engine/planner/global/GlobalPlanner.java    | 637 +++++++-------
 .../tajo/engine/planner/global/MasterPlan.java  |   1 +
 .../engine/planner/logical/CreateTableNode.java |  20 +-
 .../engine/planner/logical/DropTableNode.java   |  15 +-
 .../tajo/engine/planner/logical/LimitNode.java  |   2 +-
 .../engine/planner/logical/LogicalNode.java     |   4 +
 .../tajo/engine/planner/logical/NodeType.java   |   1 +
 .../logical/PartitionedTableScanNode.java       | 180 ++++
 .../engine/planner/logical/RelationNode.java    |   2 +-
 .../tajo/engine/planner/logical/ScanNode.java   |  19 +-
 .../engine/planner/logical/StoreTableNode.java  |  24 +-
 .../planner/logical/TableSubQueryNode.java      |   4 +-
 .../planner/logical/join/FoundJoinOrder.java    |  15 +-
 .../join/GreedyHeuristicJoinOrderAlgorithm.java | 378 ++++-----
 .../engine/planner/logical/join/JoinEdge.java   |  17 +-
 .../engine/planner/logical/join/JoinGraph.java  |  39 +-
 .../logical/join/JoinOrderAlgorithm.java        |  16 +-
 .../ColumnPartitionedTableStoreExec.java        | 210 +++++
 .../planner/physical/HashFullOuterJoinExec.java |   2 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   2 +-
 .../planner/physical/IndexedStoreExec.java      |   4 +-
 .../planner/physical/NLLeftOuterJoinExec.java   |   2 +-
 .../planner/physical/PartitionedStoreExec.java  |   4 +-
 .../engine/planner/physical/PhysicalExec.java   |   6 +-
 .../engine/planner/physical/SeqScanExec.java    |  71 +-
 .../planner/rewrite/FilterPushDownRule.java     |  47 +-
 .../rewrite/PartitionedTableRewriter.java       | 371 ++++++++
 .../planner/rewrite/ProjectionPushDownRule.java |  32 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java | 106 +++
 .../org/apache/tajo/jdbc/TajoResultSet.java     |   4 +-
 .../tajo/master/AbstractTaskScheduler.java      |   2 +-
 .../tajo/master/DefaultTaskScheduler.java       |  19 +-
 .../org/apache/tajo/master/GlobalEngine.java    |  68 +-
 .../apache/tajo/master/TajoAsyncDispatcher.java |   6 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   4 -
 .../java/org/apache/tajo/master/TajoMaster.java | 306 ++++++-
 .../tajo/master/TajoMasterClientService.java    |  51 +-
 .../apache/tajo/master/TajoMasterService.java   |  30 +-
 .../apache/tajo/master/YarnContainerProxy.java  |  68 +-
 .../tajo/master/YarnTaskRunnerLauncherImpl.java |   2 +-
 .../event/QueryDiagnosticsUpdateEvent.java      |   2 +-
 .../tajo/master/event/QueryEventType.java       |   1 -
 .../event/SubQueryDiagnosticsUpdateEvent.java   |  34 +
 .../tajo/master/event/SubQueryEventType.java    |   2 +
 .../tajo/master/event/TaskFatalErrorEvent.java  |  12 +-
 .../master/metrics/CatalogMetricsGaugeSet.java  |  54 ++
 .../metrics/WorkerResourceMetricsGaugeSet.java  |  74 ++
 .../apache/tajo/master/querymaster/Query.java   |  96 ++-
 .../master/querymaster/QueryInProgress.java     |   6 +-
 .../master/querymaster/QueryJobManager.java     |   4 +-
 .../tajo/master/querymaster/QueryMaster.java    |  74 +-
 .../querymaster/QueryMasterManagerService.java  |   5 +-
 .../master/querymaster/QueryMasterRunner.java   |   2 +-
 .../master/querymaster/QueryMasterTask.java     |  30 +-
 .../master/querymaster/QueryUnitAttempt.java    |  25 +-
 .../tajo/master/querymaster/Repartitioner.java  |   2 +-
 .../tajo/master/querymaster/SubQuery.java       | 301 +++++--
 .../tajo/master/querymaster/SubQueryState.java  |   3 +-
 .../tajo/master/rm/TajoWorkerContainer.java     |  26 +-
 .../tajo/master/rm/TajoWorkerContainerId.java   |   5 +
 .../master/rm/TajoWorkerResourceManager.java    |   8 +-
 .../master/rm/YarnRMContainerAllocator.java     |  31 +-
 .../tajo/master/rm/YarnTajoResourceManager.java |  63 +-
 .../apache/tajo/util/ApplicationIdUtils.java    |   2 +-
 .../util/metrics/GroupNameMetricsFilter.java    |  43 +
 .../tajo/util/metrics/LogEventGaugeSet.java     |  64 ++
 .../tajo/util/metrics/MetricsFilterList.java    |  43 +
 .../tajo/util/metrics/RegexpMetricsFilter.java  |  51 ++
 .../tajo/util/metrics/TajoLogEventCounter.java  |  86 ++
 .../apache/tajo/util/metrics/TajoMetrics.java   | 133 +++
 .../tajo/util/metrics/TajoSystemMetrics.java    | 213 +++++
 .../util/metrics/reporter/GangliaReporter.java  | 258 ++++++
 .../reporter/MetricsConsoleReporter.java        |  80 ++
 .../MetricsConsoleScheduledReporter.java        |  32 +
 .../reporter/MetricsFileScheduledReporter.java  |  57 ++
 .../MetricsStreamScheduledReporter.java         | 179 ++++
 .../util/metrics/reporter/NullReporter.java     |  31 +
 .../metrics/reporter/TajoMetricsReporter.java   | 232 +++++
 .../reporter/TajoMetricsScheduledReporter.java  | 206 +++++
 .../tajo/webapp/QueryExecutorServlet.java       |  17 +-
 .../tajo/worker/AbstractResourceAllocator.java  |   2 +-
 .../org/apache/tajo/worker/DeletionService.java | 115 +++
 .../tajo/worker/TajoResourceAllocator.java      |   8 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  98 ++-
 .../tajo/worker/TajoWorkerClientService.java    |  13 +-
 .../tajo/worker/TajoWorkerManagerService.java   |  11 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   6 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |   4 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |   6 +-
 .../tajo/worker/YarnResourceAllocator.java      |   7 +-
 .../src/main/proto/ClientProtos.proto           |   8 +-
 .../src/main/proto/InternalTypes.proto          |   2 +-
 .../main/proto/TajoMasterClientProtocol.proto   |   3 +-
 .../src/main/proto/TajoMasterProtocol.proto     |  12 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   4 +-
 .../src/main/resources/tajo-metrics.properties  |  75 ++
 .../org/apache/tajo/BackendTestingUtil.java     |   3 -
 .../org/apache/tajo/MiniTajoYarnCluster.java    |  10 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   3 +-
 .../test/java/org/apache/tajo/TestTajoIds.java  |   2 +-
 .../org/apache/tajo/client/TestTajoClient.java  | 119 ++-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   8 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  24 +-
 .../tajo/engine/eval/TestSQLDateTimeTypes.java  |  38 +
 .../tajo/engine/eval/TestSQLExpression.java     | 115 +++
 .../engine/function/TestDateTimeFunctions.java  |  50 ++
 .../tajo/engine/function/TestMathFunctions.java | 289 ++++++-
 .../TestStringOperatorsAndFunctions.java        | 208 ++++-
 .../tajo/engine/parser/TestHiveConverter.java   |  12 +-
 .../tajo/engine/parser/TestSQLAnalyzer.java     |   7 +-
 .../engine/planner/TestLogicalOptimizer.java    |   2 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   2 +-
 .../planner/physical/TestBSTIndexExec.java      |   2 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   2 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   2 +-
 .../planner/physical/TestPhysicalPlanner.java   |  16 +-
 .../engine/planner/physical/TestSortExec.java   |   2 +-
 .../tajo/engine/query/TestCaseByCases.java      |  82 ++
 .../engine/query/TestCreateTableStatement.java  |  57 ++
 .../tajo/engine/query/TestGroupByQuery.java     |  28 +
 .../tajo/engine/query/TestInsertQuery.java      |  10 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |  74 +-
 .../tajo/engine/query/TestSelectQuery.java      |  21 +-
 .../tajo/engine/query/TestTablePartitions.java  | 360 ++++++++
 .../tajo/engine/query/TestTableSubQuery.java    |  23 +
 .../apache/tajo/engine/util/TestTupleUtil.java  |  52 +-
 .../tajo/master/TestExecutionBlockCursor.java   |   8 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |   2 +-
 .../tajo/util/metrics/TestMetricsFilter.java    |  52 ++
 .../tajo/util/metrics/TestSystemMetrics.java    | 133 +++
 .../tajo/worker/TestRangeRetrieverHandler.java  |  17 +-
 .../src/test/queries/complex_union_1.sql        |   4 +-
 .../create_table_partition_by_column.sql        |   2 +-
 .../test/queries/create_table_various_types.sql |  48 ++
 .../src/test/queries/tajo415_case.sql           |  33 +
 .../src/test/queries/tajo418_case.sql           |  29 +
 .../src/test/tpch/customer.tbl                  |   2 +
 .../tajo/pullserver/FadvisedChunkedFile.java    |   6 +-
 .../tajo/pullserver/FadvisedFileRegion.java     |   6 +-
 .../tajo/pullserver/PullServerAuxService.java   |  22 +-
 .../tajo/pullserver/TajoPullServerService.java  |   2 +-
 .../tajo/storage/AbstractStorageManager.java    |  18 +-
 .../storage/BinarySerializeDeserialize.java     | 256 ------
 .../storage/BinarySerializerDeserializer.java   | 257 ++++++
 .../java/org/apache/tajo/storage/CSVFile.java   |  82 +-
 .../tajo/storage/CompressedSplitLineReader.java | 182 ++++
 .../org/apache/tajo/storage/FileScanner.java    |  24 +
 .../java/org/apache/tajo/storage/LazyTuple.java |  10 +-
 .../org/apache/tajo/storage/LineReader.java     |  10 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  85 +-
 .../org/apache/tajo/storage/RowStoreUtil.java   |  33 +-
 .../tajo/storage/SerializeDeserialize.java      |  34 -
 .../tajo/storage/SerializerDeserializer.java    |  34 +
 .../apache/tajo/storage/SplitLineReader.java    |  39 +
 .../tajo/storage/TextSerializeDeserialize.java  | 194 -----
 .../storage/TextSerializerDeserializer.java     | 209 +++++
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  12 +-
 .../apache/tajo/storage/v2/RCFileScanner.java   |  11 +-
 .../tajo/storage/TestCompressionStorages.java   |   2 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |  22 +-
 .../org/apache/tajo/storage/TestStorages.java   |   4 +-
 .../apache/tajo/storage/index/TestBSTIndex.java | 845 ++++++-------------
 tajo-dist/pom.xml                               |   2 -
 tajo-dist/src/main/bin/tajo                     |  24 +
 tajo-dist/src/main/conf/tajo-env.sh             |   3 +
 tajo-project/pom.xml                            |  18 +-
 .../src/site/markdown/tajo-0.2.0-doc.md         |   2 +-
 .../src/site/markdown/tajo-0.8.0-doc.md         |  66 +-
 320 files changed, 15486 insertions(+), 5160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index b10418c,5bba89b..bfc1d16
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@@ -47,7 -46,7 +47,6 @@@ public class LogicalPlan 
    /** it indicates the root block */
    public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
    public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
--  private int nextPid = 0;
    private Integer noNameBlockId = 0;
    private Integer noNameColumnId = 0;
  
@@@ -60,6 -59,6 +59,16 @@@
    /** planning and optimization log */
    private List<String> planingHistory = Lists.newArrayList();
  
++  private PIDFactory pidFactory = new PIDFactory();
++
++  public static class PIDFactory {
++    private int nextPid = 0;
++
++    public int newPID() {
++      return nextPid++;
++    }
++  }
++
    public LogicalPlan(LogicalPlanner planner) {
      this.planner = planner;
    }
@@@ -76,8 -75,8 +85,12 @@@
      return block;
    }
  
++  public PIDFactory getPidFactory() {
++    return pidFactory;
++  }
++
    public int newPID() {
--    return nextPid++;
++    return pidFactory.newPID();
    }
  
    public QueryBlock newNoNameBlock() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 87daa8f,5120106..c3cf697
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@@ -73,58 -70,43 +73,86 @@@ public class PhysicalPlannerImpl implem
      PhysicalExec execPlan;
  
      try {
 -      execPlan = createPlanRecursive(context, logicalPlan);
 -      if (execPlan instanceof StoreTableExec
 -          || execPlan instanceof IndexedStoreExec
 -          || execPlan instanceof PartitionedStoreExec
 -          || execPlan instanceof ColumnPartitionedTableStoreExec) {
 -        return execPlan;
 -      } else if (context.getDataChannel() != null) {
 -        return buildOutputOperator(context, logicalPlan, execPlan);
 -      } else {
 -        return execPlan;
 -      }
 +      plan = checkOutputOperator(context, plan);
 +      execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
 +
 +      return execPlan;
++//=======
++//      execPlan = createPlanRecursive(context, logicalPlan);
++//      if (execPlan instanceof StoreTableExec
++//          || execPlan instanceof IndexedStoreExec
++//          || execPlan instanceof PartitionedStoreExec
++//          || execPlan instanceof ColumnPartitionedTableStoreExec) {
++//        return execPlan;
++//      } else if (context.getDataChannel() != null) {
++//        return buildOutputOperator(context, logicalPlan, execPlan);
++//      } else {
++//        return execPlan;
++//      }
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      } catch (IOException ioe) {
        throw new InternalException(ioe);
      }
    }
  
 -  private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
 -                                           PhysicalExec execPlan) throws IOException {
 -    DataChannel channel = context.getDataChannel();
 -    StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
 -    if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
 -    storeTableNode.setInSchema(plan.getOutSchema());
 -    storeTableNode.setOutSchema(plan.getOutSchema());
 -    if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
 -      storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
 -    } else {
 -      storeTableNode.setDefaultParition();
 -    }
 -    storeTableNode.setChild(plan);
 +  @VisibleForTesting
 +  public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
 +      throws InternalException {
 +    PhysicalExec execPlan;
  
 -    PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
 -    return outExecPlan;
 +    try {
 +      execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
 +
 +      return execPlan;
 +    } catch (IOException ioe) {
 +      throw new InternalException(ioe);
 +    }
 +  }
++//=======
++//  private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
++//                                           PhysicalExec execPlan) throws IOException {
++//    DataChannel channel = context.getDataChannel();
++//    StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
++//    if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
++//    storeTableNode.setInSchema(plan.getOutSchema());
++//    storeTableNode.setOutSchema(plan.getOutSchema());
++//    if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
++//      storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
++//    } else {
++//      storeTableNode.setDefaultParition();
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
++//    }
++//  }
 +
 +  private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
 +    LogicalNode root = plan.getTerminalNode();
 +    List<DataChannel> channels = context.getOutgoingChannels();
 +    for (DataChannel channel : channels) {
-       LogicalNode node = plan.getRootChild(channel.getSrcPID());
++      LogicalNode node = plan.getTopNodeFromPID(channel.getSrcPID());
 +      if (node.getType() != NodeType.STORE) {
 +        StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
 +        storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
 +        storeTableNode.setInSchema(channel.getSchema());
 +        storeTableNode.setOutSchema(channel.getSchema());
 +        if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
 +          storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
 +        } else {
 +          storeTableNode.setDefaultParition();
 +        }
 +
 +        plan.remove(node, root);
 +        plan.add(node, storeTableNode, Tag.SINGLE);
 +        plan.add(storeTableNode, root, Tag.SINGLE);
 +        channel.updateSrcPID(storeTableNode.getPID());
 +      }
 +    }
 +    return plan;
    }
  
 -  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
 +  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
      PhysicalExec leftExec;
      PhysicalExec rightExec;
 +    PhysicalExec currentExec;
  
      switch (logicalNode.getType()) {
  
@@@ -167,20 -135,14 +195,22 @@@
  
        case TABLE_SUBQUERY: {
          TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
 -        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
 -        return leftExec;
 +        leftExec = createPlanRecursive(ctx, plan, subQueryNode.getSubQuery());
 +        if (plan.getParentCount(logicalNode) > 1) {
 +          return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
 +        } else {
 +          return leftExec;
 +        }
  
-       } case SCAN:
+       }
+       case PARTITIONS_SCAN:
+       case SCAN:
          leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
 -        return leftExec;
 +        if (plan.getParentCount(logicalNode) > 1) {
 +          return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
 +        } else {
 +          return leftExec;
 +        }
  
        case GROUP_BY:
          GroupbyNode grpNode = (GroupbyNode) logicalNode;
@@@ -361,11 -292,10 +391,15 @@@
      }
    }
  
 -  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
 +  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
                                                 PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
 -    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
 -    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
 +    List<LogicalNode> childs = plan.getChilds(node);
-     String [] leftLineage = PlannerUtil.getLineage(plan, childs.get(0));
-     String [] rightLineage = PlannerUtil.getLineage(plan, childs.get(1));
++    String [] leftLineage = PlannerUtil.getRelationLineage(plan, childs.get(0));
++    String [] rightLineage = PlannerUtil.getRelationLineage(plan, childs.get(1));
++//=======
++//    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++//    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long leftSize = estimateSizeRecursive(context, leftLineage);
      long rightSize = estimateSizeRecursive(context, rightLineage);
  
@@@ -435,9 -365,9 +469,12 @@@
      }
    }
  
 -  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
 +  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-     String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
 -    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++//    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long rightTableVolume = estimateSizeRecursive(context, rightLineage);
  
      if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
@@@ -456,13 -386,13 +493,16 @@@
                                                 PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
      //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
      // blocking, but merge join is blocking as well)
-     String [] outerLineage4 = PlannerUtil.getLineage(plan.getChild(join, 0));
 -    String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
++    String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++//=======
++//    String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long outerSize = estimateSizeRecursive(context, outerLineage4);
      if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
 -      LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
 -      return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
 +      LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
 +      return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
      } else {
 -      return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
 +      return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
      }
    }
  
@@@ -524,11 -454,11 +564,15 @@@
      }
    }
  
 -  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
 +  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                              PhysicalExec leftExec, PhysicalExec rightExec)
        throws IOException {
-     String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
-     String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
 -    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
 -    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++//    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++//    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long outerSize2 = estimateSizeRecursive(context, leftLineage);
      long innerSize2 = estimateSizeRecursive(context, rightLineage);
  
@@@ -562,10 -492,10 +606,14 @@@
      return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
    }
  
 -  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
 +  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-     String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
-     String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
 -    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
 -    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
++    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
++//=======
++//    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
++//    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long outerSize2 = estimateSizeRecursive(context, leftLineage);
      long innerSize2 = estimateSizeRecursive(context, rightLineage);
      final long threshold = 1048576 * 128;
@@@ -782,7 -719,7 +843,10 @@@
        return createInMemoryHashAggregation(context, groupbyNode, subOp);
      }
  
-     String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(groupbyNode).get(0));
 -    String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
++    String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(groupbyNode).get(0));
++//=======
++//    String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long estimatedSize = estimateSizeRecursive(context, outerLineage);
      final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
  
@@@ -809,12 -746,12 +873,15 @@@
        }
      }
  
 -    return createBestSortPlan(context, sortNode, child);
 +    return createBestSortPlan(context, plan, sortNode, child);
    }
  
 -  public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
 +  public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
                                       PhysicalExec child) throws IOException {
-     String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(sortNode).get(0));
 -    String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
++    String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(sortNode).get(0));
++//=======
++//    String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
++//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
      long estimatedSize = estimateSizeRecursive(context, outerLineage);
      final long threshold = 1048576 * 2000;
  
@@@ -834,12 -771,11 +901,12 @@@
      Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
          "Error: There is no table matched to %s", annotation.getCanonicalName());
  
 -    FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
 +    FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
      List<FileFragment> fragments =
-         FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
 -        FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
++        FragmentConvertor.convert(ctx.getConf(), ctx.getIncomingChannels().get(0).getStoreType(), fragmentProtos);
  
 -    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
 +    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
 +        annotation.getSortKeys());
      Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
  
      TupleComparator comp = new TupleComparator(annotation.getKeySchema(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3b8bd08,d541680..e9bb071
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@@ -29,9 -28,8 +28,10 @@@ import org.apache.tajo.catalog.Schema
  import org.apache.tajo.catalog.SortSpec;
  import org.apache.tajo.common.TajoDataTypes.DataType;
  import org.apache.tajo.engine.eval.*;
 -import org.apache.tajo.engine.planner.logical.*;
+ import org.apache.tajo.engine.exception.InvalidQueryException;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
 +import org.apache.tajo.engine.planner.global.ExecutionPlan;
 +import org.apache.tajo.engine.planner.logical.*;
- import org.apache.tajo.engine.exception.InvalidQueryException;
  import org.apache.tajo.storage.TupleComparator;
  
  import java.util.*;
@@@ -63,16 -65,40 +67,51 @@@ public class PlannerUtil 
      return tableNames;
    }
  
-   public static String [] getLineage(ExecutionPlan plan, LogicalNode node) {
++  public static String [] getRelationLineage(ExecutionPlan plan, LogicalNode node) {
 +    LogicalNode [] scans =  PlannerUtil.findAllNodes(plan, node, NodeType.SCAN);
 +    String [] tableNames = new String[scans.length];
 +    ScanNode scan;
 +    for (int i = 0; i < scans.length; i++) {
 +      scan = (ScanNode) scans[i];
 +      tableNames[i] = scan.getCanonicalName();
 +    }
 +    return tableNames;
 +  }
++
+   /**
+    * Get all scan nodes from a logical operator tree within a query block
+    *
+    * @param node a start node
+    * @return an array of relation names
+    */
+   public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode node)
+       throws PlanningException {
+     RelationFinderVisitor visitor = new RelationFinderVisitor();
+     visitor.visit(null, plan, node);
+     return visitor.getFoundRelations();
+   }
+ 
+   public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
+     private Set<String> foundRelNameSet = Sets.newHashSet();
+ 
+     public Set<String> getFoundRelations() {
+       return foundRelNameSet;
+     }
+ 
+     @Override
+     public LogicalNode visitChild(Object context, LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack)
+         throws PlanningException {
+       if (node.getType() != NodeType.TABLE_SUBQUERY) {
+         super.visitChild(context, plan, node, stack);
+       }
+ 
+       if (node instanceof RelationNode) {
+         foundRelNameSet.add(((RelationNode) node).getCanonicalName());
+       }
+ 
+       return node;
+     }
+   }
    
    /**
     * Delete the logical node from a plan.
@@@ -344,21 -279,6 +383,21 @@@
      return child;
    }
  
 +  public static SortNode[] transformSortTo2p(LogicalPlan plan, SortNode sort) {
 +    Preconditions.checkArgument(sort != null);
 +    SortNode parent = null, child = null;
 +    try {
 +      parent = sort;
 +      child = (SortNode) sort.clone();
 +      child.setPid(plan.newPID());
 +    } catch (CloneNotSupportedException e) {
-       LOG.warn(e);
++      throw new RuntimeException(e);
 +    }
 +
 +    parent.setInSchema(child.getOutSchema());
 +
 +    return new SortNode[]{parent, child};
 +  }
    
    /**
     * Find the top logical node matched to type from the given node
@@@ -866,9 -677,11 +905,11 @@@
      return copy;
    }
  
-   public static <T extends LogicalNode> T clone(LogicalNode node) {
 -  public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
++  public static <T extends LogicalNode> T clone(PIDFactory pidFactory, LogicalNode node) {
      try {
-       return (T) node.clone();
+       T copy = (T) node.clone();
 -      copy.setPID(plan.newPID());
++      copy.setPID(pidFactory.newPID());
+       return copy;
      } catch (CloneNotSupportedException e) {
        throw new RuntimeException(e);
      }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 26cedd7,556c7ff..034d71f
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@@ -27,8 -27,8 +27,10 @@@ import static org.apache.tajo.catalog.p
  import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
  
  public class DataChannel {
--  private ExecutionBlockId srcId;
--  private ExecutionBlockId targetId;
++//  private ExecutionBlockId srcId;
++//  private ExecutionBlockId targetId;
++  private ExecutionBlockPID srcId;
++  private ExecutionBlockPID targetId;
    private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
    private PartitionType partitionType;
    private Integer partitionNum = 1;
@@@ -36,21 -36,15 +38,20 @@@
  
    private Schema schema;
  
-   private StoreType storeType = StoreType.CSV;
- 
-   private Integer srcPID;
-   private Integer targetPID;
+   private StoreType storeType = StoreType.RAW;
  
 -  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
 -    this.srcId = srcId;
 -    this.targetId = targetId;
 +  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
-     this.srcId = srcId;
-     this.targetId = targetId;
-     this.srcPID = srcPID;
-     this.targetPID = targetPID;
++//    this.srcId = srcId;
++//    this.targetId = targetId;
++    this.srcId = new ExecutionBlockPID(srcId, srcPID);
++    this.targetId = new ExecutionBlockPID(targetId, targetPID);
++//    this.srcPID = srcPID;
++//    this.targetPID = targetPID;
    }
  
 -  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
 -    this(srcId, targetId);
 +  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
 +                     PartitionType partitionType) {
 +    this(srcId, targetId, srcPID, targetPID);
      this.partitionType = partitionType;
    }
  
@@@ -67,8 -59,8 +68,8 @@@
    }
  
    public DataChannel(DataChannelProto proto) {
--    this.srcId = new ExecutionBlockId(proto.getSrcId());
--    this.targetId = new ExecutionBlockId(proto.getTargetId());
++//    this.srcId = new ExecutionBlockId(proto.getSrcId());
++//    this.targetId = new ExecutionBlockId(proto.getTargetId());
      this.transmitType = proto.getTransmitType();
      this.partitionType = proto.getPartitionType();
      if (proto.hasSchema()) {
@@@ -85,20 -77,18 +86,37 @@@
      if (proto.hasPartitionNum()) {
        this.partitionNum = proto.getPartitionNum();
      }
-     if (proto.hasSrcPID()) {
-       this.srcPID = proto.getSrcPID();
-     }
-     if (proto.hasTargetPID()) {
-       this.targetPID = proto.getTargetPID();
++//    if (proto.hasSrcPID()) {
++//      this.srcPID = proto.getSrcPID();
++//    }
++//    if (proto.hasTargetPID()) {
++//      this.targetPID = proto.getTargetPID();
++//    }
++
++    Integer srcPID = proto.hasSrcPID() ? proto.getSrcPID() : null;
++    Integer targetPID = proto.hasTargetPID() ? proto.getTargetPID() : null;
++    this.srcId = new ExecutionBlockPID(new ExecutionBlockId(proto.getSrcId()), srcPID);
++    this.targetId = new ExecutionBlockPID(new ExecutionBlockId(proto.getTargetId()), targetPID);
+ 
+     if (proto.hasStoreType()) {
+       this.storeType = proto.getStoreType();
      }
    }
  
    public ExecutionBlockId getSrcId() {
--    return srcId;
++    return srcId.getExecutionBlockId();
    }
  
    public ExecutionBlockId getTargetId() {
--    return targetId;
++    return targetId.getExecutionBlockId();
++  }
++
++  public ExecutionBlockPID getSrcExecutionPID() {
++    return srcId;
++  }
++
++  public ExecutionBlockPID getTargetExecutionPID() {
++    return srcId;
    }
  
    public PartitionType getPartitionType() {
@@@ -160,8 -150,8 +178,8 @@@
  
    public DataChannelProto getProto() {
      DataChannelProto.Builder builder = DataChannelProto.newBuilder();
--    builder.setSrcId(srcId.getProto());
--    builder.setTargetId(targetId.getProto());
++    builder.setSrcId(srcId.getExecutionBlockId().getProto());
++    builder.setTargetId(targetId.getExecutionBlockId().getProto());
      if (transmitType != null) {
        builder.setTransmitType(transmitType);
      }
@@@ -177,11 -167,9 +195,15 @@@
      if (partitionNum != null) {
        builder.setPartitionNum(partitionNum);
      }
-     if (srcPID != null) {
-       builder.setSrcPID(srcPID);
++    if (srcId.getPid() != null) {
++      builder.setSrcPID(srcId.getPid());
 +    }
-     if (targetPID != null) {
-       builder.setTargetPID(targetPID);
++    if (targetId.getPid() != null) {
++      builder.setTargetPID(targetId.getPid());
++    }
+ 
+     if(storeType != null){
+       builder.setStoreType(storeType);
      }
      return builder.build();
    }
@@@ -196,8 -184,8 +218,9 @@@
  
    public String toString() {
      StringBuilder sb = new StringBuilder();
--    sb.append("[").append(srcId.getQueryId()).append("] ");
-     sb.append(srcId.getId()).append("."+srcPID).append(" => ").append(targetId.getId()).append("."+targetPID);
 -    sb.append(srcId.getId()).append(" => ").append(targetId.getId());
++    sb.append("[").append(srcId.getExecutionBlockId().getQueryId()).append("] ");
++    sb.append(srcId.getExecutionBlockId().getId()).append("."+srcId.getPid()).append(" => ")
++        .append(targetId.getExecutionBlockId().getId()).append("."+targetId.getPid());
      sb.append(" (type=").append(partitionType);
      if (hasPartitionKey()) {
        sb.append(", key=");
@@@ -215,16 -203,4 +238,16 @@@
      sb.append(")");
      return sb.toString();
    }
 +
 +  public void updateSrcPID(int srcPID) {
-     this.srcPID = srcPID;
++    this.srcId.updatePid(srcPID);
 +  }
 +
 +  public Integer getSrcPID() {
-     return srcPID;
++    return srcId.getPid();
 +  }
 +
 +  public Integer getTargetPID() {
-     return targetPID;
++    return targetId.getPid();
 +  }
  }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 7cfa478,4f3976e..f964302
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@@ -14,15 -14,11 +14,16 @@@
  
  package org.apache.tajo.engine.planner.global;
  
 +import com.google.common.annotations.VisibleForTesting;
  import org.apache.tajo.ExecutionBlockId;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
  import org.apache.tajo.engine.planner.enforce.Enforcer;
 -import org.apache.tajo.engine.planner.logical.*;
 +import org.apache.tajo.engine.planner.logical.LogicalNode;
 +import org.apache.tajo.engine.planner.logical.LogicalRootNode;
  
 -import java.util.*;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.Set;
  
  /**
   * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@@ -33,17 -29,16 +34,17 @@@
   */
  public class ExecutionBlock {
    private ExecutionBlockId executionBlockId;
 -  private LogicalNode plan = null;
 -  private StoreTableNode store = null;
 -  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
 +  private ExecutionPlan executionPlan;
    private Enforcer enforcer = new Enforcer();
  
 -  private boolean hasJoinPlan;
 -  private boolean hasUnionPlan;
 -
    private Set<String> broadcasted = new HashSet<String>();
  
-   public ExecutionBlock(ExecutionBlockId executionBlockId, LogicalRootNode rootNode) {
++  public ExecutionBlock(ExecutionBlockId executionBlockId, PIDFactory pidFactory, LogicalRootNode rootNode) {
 +    this.executionBlockId = executionBlockId;
-     this.executionPlan = new ExecutionPlan(rootNode);
++    this.executionPlan = new ExecutionPlan(pidFactory, rootNode);
 +  }
 +
 +  @VisibleForTesting
    public ExecutionBlock(ExecutionBlockId executionBlockId) {
      this.executionBlockId = executionBlockId;
    }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
index 0000000,0000000..d120057
new file mode 100644
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
@@@ -1,0 -1,0 +1,43 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.engine.planner.global;
++
++import org.apache.tajo.ExecutionBlockId;
++
++public class ExecutionBlockPID {
++  private ExecutionBlockId executionBlockId;
++  private Integer pid;
++
++  public ExecutionBlockPID(ExecutionBlockId executionBlockId, Integer pid) {
++    this.executionBlockId = executionBlockId;
++    this.pid = pid;
++  }
++
++  public ExecutionBlockId getExecutionBlockId() {
++    return executionBlockId;
++  }
++
++  public void updatePid(Integer pid) {
++    this.pid = pid;
++  }
++
++  public Integer getPid() {
++    return pid;
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2495601c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --cc tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index 1c02d70,0000000..ee42eef
mode 100644,000000..100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@@ -1,381 -1,0 +1,393 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.engine.planner.global;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.gson.annotations.Expose;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.engine.json.CoreGsonHelper;
++import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
 +import org.apache.tajo.engine.planner.PlannerUtil;
 +import org.apache.tajo.engine.planner.PlanningException;
 +import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
 +import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 +import org.apache.tajo.engine.planner.logical.*;
 +import org.apache.tajo.json.GsonObject;
 +
 +import java.util.*;
 +import java.util.Map.Entry;
 +
 +/**
 + * ExecutionPlan is a DAG of logical nodes.
 + * If there are two input sources, the plan should include Join or Union.
 + * The terminalNode is used as the start position of the traversal, because there are multiple output destinations.
 + */
 +public class ExecutionPlan implements GsonObject {
++  @Expose private PIDFactory pidFactory;
 +  @Expose private InputContext inputContext;
 +  @Expose private boolean hasUnionPlan;
 +  @Expose private boolean hasJoinPlan;
 +  @Expose private LogicalRootNode terminalNode;
 +  @Expose private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
 +  @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
 +      = new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
 +
 +  @VisibleForTesting
-   public ExecutionPlan() {
- 
++  public ExecutionPlan(PIDFactory pidFactory) {
++    this.pidFactory = pidFactory;
 +  }
 +
-   public ExecutionPlan(LogicalRootNode terminalNode) {
-     this.terminalNode = PlannerUtil.clone(terminalNode);
++  public ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
++    this(pidFactory);
++    this.terminalNode = PlannerUtil.clone(pidFactory, terminalNode);
 +  }
 +
 +  public void setPlan(LogicalNode plan) {
 +    this.clear();
 +    this.addPlan(plan);
 +  }
 +
 +  private void clear() {
 +    for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
 +      graph.removeEdge(edge.getChildId(), edge.getParentId());
 +    }
 +    vertices.clear();
 +    this.inputContext = null;
 +    this.hasUnionPlan = false;
 +    this.hasJoinPlan = false;
 +  }
 +
 +  public void addPlan(LogicalNode plan) {
-     LogicalNode current = PlannerUtil.clone(plan);
++    LogicalNode current = PlannerUtil.clone(pidFactory, plan);
 +    if (current.getType() == NodeType.ROOT) {
 +      terminalNode = (LogicalRootNode) current;
 +    } else {
 +      this.add(current, terminalNode, Tag.SINGLE);
 +      terminalNode.setChild(current);
 +    }
 +    ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
 +    builder.visit(terminalNode);
 +  }
 +
 +  public void add(LogicalNode child, LogicalNode parent, Tag tag) {
 +    vertices.put(child.getPID(), child);
 +    vertices.put(parent.getPID(), parent);
 +    graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, tag));
 +  }
 +
 +  public void setInputContext(InputContext contexts) {
 +    this.inputContext = contexts;
 +  }
 +
 +  public boolean hasJoinPlan() {
 +    return this.hasJoinPlan;
 +  }
 +
 +  public boolean hasUnionPlan() {
 +    return this.hasUnionPlan;
 +  }
 +
 +  public LogicalRootNode getTerminalNode() {
 +    return terminalNode;
 +  }
 +
 +  public InputContext getInputContext() {
 +    return inputContext;
 +  }
 +
 +  public String toString() {
 +    return graph.toStringGraph(terminalNode.getPID());
 +  }
 +
 +  public Tag getTag(LogicalNode child, LogicalNode parent) {
 +    return graph.getEdge(child.getPID(), parent.getPID()).getTag();
 +  }
 +
 +  public LogicalNode getChild(LogicalNode parent, Tag tag) {
 +    List<ExecutionPlanEdge> incomingEdges = graph.getIncomingEdges(parent.getPID());
 +    for (ExecutionPlanEdge inEdge : incomingEdges) {
 +      if (inEdge.getTag() == tag) {
 +        return vertices.get(inEdge.getChildId());
 +      }
 +    }
 +    return null;
 +  }
 +
 +  @Override
 +  public String toJson() {
 +    return CoreGsonHelper.toJson(this, ExecutionPlan.class);
 +  }
 +
 +  public Schema getOutSchema(int i) {
 +    return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
 +  }
 +
 +  @Override
 +  public boolean equals(Object o) {
 +    if (o instanceof ExecutionPlan) {
 +      ExecutionPlan other = (ExecutionPlan) o;
 +      boolean eq = this.hasJoinPlan == other.hasJoinPlan;
 +      eq &= this.hasUnionPlan == other.hasUnionPlan;
 +      eq &= this.terminalNode.equals(other.terminalNode);
 +      eq &= this.inputContext.equals(other.inputContext);
 +      if (!eq) {
 +        return false;
 +      }
 +
 +      ExecutionPlanComparator comparator = new ExecutionPlanComparator(this, other);
 +      eq &= comparator.compare();
 +      return eq;
 +    }
 +    return false;
 +  }
 +
-   public LogicalNode getRootChild(int pid) {
++  public LogicalNode getTopNode(int index) {
++    return vertices.get(getTopNodePid(index));
++  }
++
++  public int getTopNodePid(int index) {
++    return graph.getChild(terminalNode.getPID(), index);
++  }
++
++  public LogicalNode getTopNodeFromPID(int pid) {
 +    for (Integer childId : graph.getChilds(terminalNode.getPID())) {
 +      if (childId == pid) {
 +        return vertices.get(childId);
 +      }
 +    }
 +    return null;
 +  }
 +
 +  public int getChildCount(LogicalNode node) {
 +    return graph.getChildCount(node.getPID());
 +  }
 +
 +  public LogicalNode getChild(LogicalNode node, int i) {
 +    return vertices.get(graph.getChild(node.getPID(), i));
 +  }
 +
 +  public int getParentCount(LogicalNode node) {
 +    return graph.getParentCount(node.getPID());
 +  }
 +
 +  public LogicalNode getParent(LogicalNode node, int i) {
 +    return vertices.get(graph.getParent(node.getPID(), i));
 +  }
 +
 +  public List<LogicalNode> getChilds(LogicalNode node) {
 +    List<LogicalNode> childs = new ArrayList<LogicalNode>();
 +    for (Integer childId : graph.getChilds(node.getPID())) {
 +      childs.add(vertices.get(childId));
 +    }
 +    return childs;
 +  }
 +
 +  public void remove(LogicalNode child, LogicalNode parent) {
 +    this.graph.removeEdge(child.getPID(), parent.getPID());
 +  }
 +
 +  private static class LogicalNodeIdAndTag {
 +    @Expose int id;
 +    @Expose Tag tag;
 +
 +    public LogicalNodeIdAndTag(int id, Tag tag) {
 +      this.id = id;
 +      this.tag = tag;
 +    }
 +  }
 +
 +  public static class ExecutionPlanJsonHelper implements GsonObject {
 +    @Expose private final boolean hasJoinPlan;
 +    @Expose private final boolean hasUnionPlan;
 +    @Expose private final InputContext inputContext;
 +    @Expose private final LogicalRootNode terminalNode;
 +    @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
 +    @Expose Map<Integer, List<LogicalNodeIdAndTag>> adjacentList = new HashMap<Integer, List<LogicalNodeIdAndTag>>();
 +
 +    public ExecutionPlanJsonHelper(ExecutionPlan plan) {
 +      this.hasJoinPlan = plan.hasJoinPlan;
 +      this.hasUnionPlan = plan.hasUnionPlan;
 +      this.inputContext = plan.getInputContext();
 +      this.terminalNode = plan.terminalNode;
 +      this.vertices.putAll(plan.vertices);
 +      Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
 +      int parentId, childId;
 +      List<LogicalNodeIdAndTag> adjacents;
 +
 +      // convert the graph to an adjacent list
 +      for (ExecutionPlanEdge edge : edges) {
 +        childId = edge.getChildId();
 +        parentId = edge.getParentId();
 +
 +        if (adjacentList.containsKey(childId)) {
 +          adjacents = adjacentList.get(childId);
 +        } else {
 +          adjacents = new ArrayList<LogicalNodeIdAndTag>();
 +          adjacentList.put(childId, adjacents);
 +        }
 +        adjacents.add(new LogicalNodeIdAndTag(parentId, edge.getTag()));
 +      }
 +    }
 +
 +    @Override
 +    public String toJson() {
 +      return CoreGsonHelper.toJson(this, ExecutionPlanJsonHelper.class);
 +    }
 +
 +    public ExecutionPlan toExecutionPlan() {
-       ExecutionPlan plan = new ExecutionPlan(this.terminalNode);
++      // TODO: check that it works
++      ExecutionPlan plan = new ExecutionPlan(null, this.terminalNode);
 +      plan.hasJoinPlan = this.hasJoinPlan;
 +      plan.hasUnionPlan = this.hasUnionPlan;
 +      plan.setInputContext(this.inputContext);
 +      plan.vertices.putAll(this.vertices);
 +
 +      for (Entry<Integer, List<LogicalNodeIdAndTag>> e : this.adjacentList.entrySet()) {
 +        LogicalNode child = this.vertices.get(e.getKey());
 +        for (LogicalNodeIdAndTag idAndTag : e.getValue()) {
 +          plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
 +        }
 +      }
 +
 +      return plan;
 +    }
 +  }
 +
 +  private static class ExecutionPlanComparator {
 +    ExecutionPlan plan1;
 +    ExecutionPlan plan2;
 +    boolean equal = true;
 +
 +    public ExecutionPlanComparator(ExecutionPlan plan1, ExecutionPlan plan2) {
 +      this.plan1 = plan1;
 +      this.plan2 = plan2;
 +    }
 +
 +    public boolean compare() {
 +      Stack<Integer> s1 = new Stack<Integer>();
 +      Stack<Integer> s2 = new Stack<Integer>();
 +      s1.push(plan1.terminalNode.getPID());
 +      s2.push(plan2.terminalNode.getPID());
 +      return recursiveCompare(s1, s2);
 +    }
 +
 +    private boolean recursiveCompare(Stack<Integer> s1, Stack<Integer> s2) {
 +      Integer l1 = s1.pop();
 +      Integer l2 = s2.pop();
 +
 +      if (l1.equals(l2)) {
 +        if (plan1.graph.getChildCount(l1) == plan2.graph.getChildCount(l2)) {
 +          if (plan1.graph.getChildCount(l1) > 0
 +              && plan2.graph.getChildCount(l2) > 0) {
 +            for (Integer child : plan1.graph.getChilds(l1)) {
 +              s1.push(child);
 +            }
 +            for (Integer child : plan2.graph.getChilds(l2)) {
 +              s2.push(child);
 +            }
 +          } else {
 +            equal &= true;
 +            return recursiveCompare(s1, s2);
 +          }
 +        } else {
 +          equal = false;
 +        }
 +      } else {
 +        equal = false;
 +      }
 +      return equal;
 +    }
 +  }
 +
 +  private static class ExecutionPlanBuilder implements LogicalNodeVisitor {
 +    private ExecutionPlan plan;
 +
 +    public ExecutionPlanBuilder(ExecutionPlan plan) {
 +      this.plan = plan;
 +    }
 +
 +    @Override
 +    public void visit(LogicalNode current) {
 +      try {
 +        Preconditions.checkArgument(current instanceof UnaryNode, "The current node should be an unary node");
 +        visit(current, Tag.SINGLE);
 +      } catch (PlanningException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    private void visit(LogicalNode current, Tag tag) throws PlanningException {
 +      if (current instanceof UnaryNode) {
 +        visitUnary((UnaryNode) current, tag);
 +      } else if (current instanceof BinaryNode) {
 +        visitBinary((BinaryNode) current, tag);
 +      } else if (current instanceof ScanNode) {
 +        visitScan((ScanNode) current, tag);
 +      } else if (current instanceof TableSubQueryNode) {
 +        visitTableSubQuery((TableSubQueryNode) current, tag);
 +      }
 +    }
 +
 +    private void visitScan(ScanNode node, Tag tag) throws PlanningException {
 +      if (plan.inputContext == null) {
 +        plan.inputContext = new InputContext();
 +      }
 +      plan.inputContext.addScanNode(node);
 +    }
 +
 +    private void visitUnary(UnaryNode node, Tag tag) throws PlanningException {
 +      if (node.getChild() != null) {
-         LogicalNode child = PlannerUtil.clone(node.getChild());
++        LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getChild());
 +        plan.add(child, node, tag);
 +        node.setChild(null);
 +        visit(child, tag);
 +      }
 +    }
 +
 +    private void visitBinary(BinaryNode node, Tag tag) throws PlanningException {
 +      Preconditions.checkArgument(tag == Tag.SINGLE);
 +
 +      LogicalNode child;
 +      if (node.getType() == NodeType.JOIN) {
 +        plan.hasJoinPlan = true;
 +      } else if (node.getType() == NodeType.UNION) {
 +        plan.hasUnionPlan = true;
 +      }
 +      if (node.getLeftChild() != null) {
-         child = PlannerUtil.clone(node.getLeftChild());
++        child = PlannerUtil.clone(plan.pidFactory, node.getLeftChild());
 +        plan.add(child, node, Tag.LEFT);
 +        node.setLeftChild(null);
 +        visit(child, Tag.LEFT);
 +      }
 +      if (node.getRightChild() != null) {
-         child = PlannerUtil.clone(node.getRightChild());
++        child = PlannerUtil.clone(plan.pidFactory, node.getRightChild());
 +        plan.add(child, node, Tag.RIGHT);
 +        node.setRightChild(null);
 +        visit(child, Tag.RIGHT);
 +      }
 +    }
 +
 +    private void visitTableSubQuery(TableSubQueryNode node, Tag tag) throws PlanningException {
-       LogicalNode child = PlannerUtil.clone(node.getSubQuery());
++      LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getSubQuery());
 +      plan.add(child, node, tag);
 +      visit(child, tag);
 +    }
 +  }
 +}