You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2023/01/30 03:27:02 UTC

[asterixdb] 13/30: [ASTERIXDB-3075][COMP] Fix ExtractCommonOperatorsRule producing invalid plan

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f88ee76e089bc9f04edf8b2e0e7be63bfe80b0ca
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Thu Sep 15 10:17:07 2022 -0700

    [ASTERIXDB-3075][COMP] Fix ExtractCommonOperatorsRule producing invalid plan
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Reset the states of the variables used for calculating
    the query plan cluster of activiites after each modification
    to the plan.
    
    Change-Id: If7be34f4262ea9f52fc9a40526909c0afb9091f0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17227
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17330
    Reviewed-by: Michael Blow <mb...@apache.org>
    Tested-by: Michael Blow <mb...@apache.org>
---
 .../extract-common-operators.01.sqlpp              | 49 +++++++++++
 .../extract-common-operators.01.plan               | 98 ++++++++++++++++++++++
 .../metadata_only_02.3.query.sqlpp                 | 49 +++++++++++
 .../metadata_only_02/metadata_only_02.9.ddl.sqlpp  | 21 +++++
 .../misc/metadata_only_02/metadata_only_02.3.adm   |  2 +
 .../rewriter/rules/ExtractCommonOperatorsRule.java | 65 +++++++-------
 6 files changed, 250 insertions(+), 34 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/extract-common-operators/extract-common-operators.01.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/extract-common-operators/extract-common-operators.01.sqlpp
new file mode 100644
index 0000000000..11c7325d94
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/extract-common-operators/extract-common-operators.01.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * test fix for ASTERIXDB-3075
+ */
+
+LET dv_name = ["test2", "test1"], ds_name = ["ds2", "ds1"], syn_name = [ "syn2", "syn1" ],
+synonym_names = (SELECT s.SynonymName, s.ObjectName
+                 FROM Metadata.`Synonym` s, syn_name
+                 WHERE s.SynonymName = syn_name),
+
+dataset_ds_dv_names = (SELECT d.DatasetName, d.DataverseName
+                       FROM Metadata.`Dataset` d, (SELECT * FROM ds_name, dv_name) AS ds_dv_names
+                       WHERE d.DatasetName = ds_dv_names.ds_name AND d.DataverseName = ds_dv_names.dv_name),
+
+dataset_dv_ds_names = (SELECT d.DatasetName, d.DataverseName
+                       FROM Metadata.`Dataset` d, (SELECT * FROM dv_name, ds_name) AS dv_ds_names
+                       WHERE d.DatasetName = dv_ds_names.ds_name AND d.DataverseName = dv_ds_names.dv_name),
+
+left_branch = (SELECT s.SynonymName, s.DataverseName, s.ObjectName
+               FROM Metadata.`Synonym` s LEFT OUTER JOIN dataset_dv_ds_names
+               ON dataset_dv_ds_names.DatasetName = s.ObjectName
+               ORDER BY s.DataverseName, s.SynonymName),
+
+right_branch = (SELECT synonym_names.SynonymName
+                FROM synonym_names LEFT OUTER JOIN dataset_ds_dv_names
+                ON dataset_ds_dv_names.DatasetName = synonym_names.ObjectName)
+
+FROM left_branch lb LEFT OUTER JOIN right_branch rb
+ON lb.ObjectName = rb.SynonymName
+SELECT lb.DataverseName, lb.SynonymName
+ORDER BY lb.DataverseName, lb.SynonymName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/extract-common-operators/extract-common-operators.01.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/extract-common-operators/extract-common-operators.01.plan
new file mode 100644
index 0000000000..677a45f92d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/extract-common-operators/extract-common-operators.01.plan
@@ -0,0 +1,98 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$381(ASC), $$382(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$381(ASC), $$382(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- HYBRID_HASH_JOIN [$$390][$$386]  |PARTITIONED|
+                    -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$381(ASC), $$382(ASC)] HASH:[$$390]  |PARTITIONED|
+                      -- STABLE_SORT [$$381(ASC), $$382(ASC)]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$390][$$384]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$390]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$384]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$384, $$383][$$ds_name, $$dv_name]  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- REPLICATE  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$384, $$383]  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (Metadata.Dataset)  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$ds_name, $$dv_name]  |PARTITIONED|
+                                          -- NESTED_LOOP  |UNPARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                              -- UNNEST  |UNPARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                              -- REPLICATE  |UNPARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                  -- UNNEST  |UNPARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$386]  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- HYBRID_HASH_JOIN [$$404][$$388]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$404]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$386][$$syn_name]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$386]  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$syn_name]  |PARTITIONED|
+                                      -- UNNEST  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$388]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$388, $$387][$$ds_name, $$dv_name]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$384, $$383]  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (Metadata.Dataset)  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$ds_name, $$dv_name]  |PARTITIONED|
+                                      -- NESTED_LOOP  |UNPARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                          -- STREAM_PROJECT  |UNPARTITIONED|
+                                            -- ASSIGN  |UNPARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                -- REPLICATE  |UNPARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                    -- UNNEST  |UNPARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                          -- UNNEST  |UNPARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.3.query.sqlpp
new file mode 100644
index 0000000000..11c7325d94
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.3.query.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * test fix for ASTERIXDB-3075
+ */
+
+LET dv_name = ["test2", "test1"], ds_name = ["ds2", "ds1"], syn_name = [ "syn2", "syn1" ],
+synonym_names = (SELECT s.SynonymName, s.ObjectName
+                 FROM Metadata.`Synonym` s, syn_name
+                 WHERE s.SynonymName = syn_name),
+
+dataset_ds_dv_names = (SELECT d.DatasetName, d.DataverseName
+                       FROM Metadata.`Dataset` d, (SELECT * FROM ds_name, dv_name) AS ds_dv_names
+                       WHERE d.DatasetName = ds_dv_names.ds_name AND d.DataverseName = ds_dv_names.dv_name),
+
+dataset_dv_ds_names = (SELECT d.DatasetName, d.DataverseName
+                       FROM Metadata.`Dataset` d, (SELECT * FROM dv_name, ds_name) AS dv_ds_names
+                       WHERE d.DatasetName = dv_ds_names.ds_name AND d.DataverseName = dv_ds_names.dv_name),
+
+left_branch = (SELECT s.SynonymName, s.DataverseName, s.ObjectName
+               FROM Metadata.`Synonym` s LEFT OUTER JOIN dataset_dv_ds_names
+               ON dataset_dv_ds_names.DatasetName = s.ObjectName
+               ORDER BY s.DataverseName, s.SynonymName),
+
+right_branch = (SELECT synonym_names.SynonymName
+                FROM synonym_names LEFT OUTER JOIN dataset_ds_dv_names
+                ON dataset_ds_dv_names.DatasetName = synonym_names.ObjectName)
+
+FROM left_branch lb LEFT OUTER JOIN right_branch rb
+ON lb.ObjectName = rb.SynonymName
+SELECT lb.DataverseName, lb.SynonymName
+ORDER BY lb.DataverseName, lb.SynonymName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.9.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.9.ddl.sqlpp
new file mode 100644
index 0000000000..bd99c9dc45
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_02/metadata_only_02.9.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test1 if exists;
+drop dataverse test2 if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_02/metadata_only_02.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_02/metadata_only_02.3.adm
new file mode 100644
index 0000000000..743bb8ab39
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_02/metadata_only_02.3.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "test1", "SynonymName": "syn1" }
+{ "DataverseName": "test2", "SynonymName": "syn2" }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index bd24644dd1..fba095aae9 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -99,7 +99,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                 changed = false;
                 // applying the rewriting until fixpoint
                 topDownMaterialization(roots);
-                genCandidates(context);
+                genCandidates();
                 removeTrivialShare();
                 if (!equivalenceClasses.isEmpty()) {
                     changed = rewrite(context);
@@ -110,9 +110,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                 equivalenceClasses.clear();
                 childrenToParents.clear();
                 opToCandidateInputs.clear();
-                clusterMap.clear();
-                clusterWaitForMap.clear();
-                lastUsedClusterId = 0; // Resets lastUsedClusterId to 0.
+                resetPlanClusterState();
             } while (changed);
             roots.clear();
         }
@@ -151,7 +149,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
 
     private boolean rewriteForOneEquivalentClass(List<Mutable<ILogicalOperator>> members, IOptimizationContext context)
             throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> group = new ArrayList<Mutable<ILogicalOperator>>();
+        List<Mutable<ILogicalOperator>> group = new ArrayList<>();
         boolean rewritten = false;
         while (members.size() > 0) {
             group.clear();
@@ -176,14 +174,14 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
             AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
             List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
 
-            rop.setExecutionMode(((AbstractLogicalOperator) candidate.getValue()).getExecutionMode());
+            rop.setExecutionMode(candidate.getValue().getExecutionMode());
             if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                 rop.getInputs().add(candidate);
             } else {
                 AbstractLogicalOperator beforeExchange = new ExchangeOperator();
                 beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
                 beforeExchange.setExecutionMode(rop.getExecutionMode());
-                Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
+                Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<>(beforeExchange);
                 beforeExchange.getInputs().add(candidate);
                 context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
                 beforeExchange.recomputeSchema();
@@ -202,7 +200,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                     AbstractLogicalOperator exchange = new ExchangeOperator();
                     exchange.setPhysicalOperator(new OneToOneExchangePOperator());
                     exchange.setExecutionMode(rop.getExecutionMode());
-                    MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
+                    MutableObject<ILogicalOperator> exchangeRef = new MutableObject<>(exchange);
                     exchange.getInputs().add(new MutableObject<>(rop));
                     rop.getOutputs().add(exchangeRef);
                     context.computeAndSetTypeEnvironmentForOperator(exchange);
@@ -246,10 +244,10 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                 exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
                 exchOp.setExecutionMode(rop.getExecutionMode());
                 exchOp.getInputs().add(new MutableObject<>(rop));
-                MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
+                MutableObject<ILogicalOperator> exchOpRef = new MutableObject<>(exchOp);
                 rop.getOutputs().add(exchOpRef);
                 assignOperator.getInputs().add(exchOpRef);
-                projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(assignOperator));
+                projectOperator.getInputs().add(new MutableObject<>(assignOperator));
 
                 // set the types
                 context.computeAndSetTypeEnvironmentForOperator(exchOp);
@@ -267,15 +265,15 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                             parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator : projectOperator;
                     if (parentOp.getPhysicalOperator().isMicroOperator()
                             || parentOp.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                        parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
+                        parentOp.getInputs().set(index, new MutableObject<>(childOp));
                     } else {
                         // If the parent operator is a hyracks operator,
                         // an extra one-to-one exchange is needed.
                         AbstractLogicalOperator exchg = new ExchangeOperator();
                         exchg.setPhysicalOperator(new OneToOneExchangePOperator());
                         exchg.setExecutionMode(childOp.getExecutionMode());
-                        exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
-                        parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
+                        exchg.getInputs().add(new MutableObject<>(childOp));
+                        parentOp.getInputs().set(index, new MutableObject<>(exchg));
                         context.computeAndSetTypeEnvironmentForOperator(exchg);
                         exchg.recomputeSchema();
                     }
@@ -285,10 +283,17 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
             }
             cleanupPlan();
             rewritten = true;
+            resetPlanClusterState();
         }
         return rewritten;
     }
 
+    private void resetPlanClusterState() {
+        clusterMap.clear();
+        clusterWaitForMap.clear();
+        lastUsedClusterId = 0;
+    }
+
     /**
      * Cleans up the plan after combining similar branches into one branch making sure parents & children point to
      * each other correctly.
@@ -348,17 +353,15 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
         }
     }
 
-    private void genCandidates(IOptimizationContext context) throws AlgebricksException {
-        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses =
-                new ArrayList<List<Mutable<ILogicalOperator>>>();
+    private void genCandidates() throws AlgebricksException {
+        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<>();
         while (equivalenceClasses.size() > 0) {
             previousEquivalenceClasses.clear();
             for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
-                List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<Mutable<ILogicalOperator>>();
-                candidatesCopy.addAll(candidates);
+                List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<>(candidates);
                 previousEquivalenceClasses.add(candidatesCopy);
             }
-            List<Mutable<ILogicalOperator>> currentLevelOpRefs = new ArrayList<Mutable<ILogicalOperator>>();
+            List<Mutable<ILogicalOperator>> currentLevelOpRefs = new ArrayList<>();
             for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
                 if (candidates.size() > 0) {
                     for (Mutable<ILogicalOperator> opRef : candidates) {
@@ -376,11 +379,11 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
             if (currentLevelOpRefs.size() == 0) {
                 break;
             }
-            prune(context);
+            prune();
         }
         if (equivalenceClasses.size() < 1 && previousEquivalenceClasses.size() > 0) {
             equivalenceClasses.addAll(previousEquivalenceClasses);
-            prune(context);
+            prune();
         }
     }
 
@@ -412,8 +415,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
     }
 
     private void candidatesGrow(List<Mutable<ILogicalOperator>> opList, List<Mutable<ILogicalOperator>> candidates) {
-        List<Mutable<ILogicalOperator>> previousCandidates = new ArrayList<Mutable<ILogicalOperator>>();
-        previousCandidates.addAll(candidates);
+        List<Mutable<ILogicalOperator>> previousCandidates = new ArrayList<>(candidates);
         candidates.clear();
         boolean validCandidate = false;
         for (Mutable<ILogicalOperator> op : opList) {
@@ -450,23 +452,18 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
         }
     }
 
-    private void prune(IOptimizationContext context) throws AlgebricksException {
-        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses =
-                new ArrayList<List<Mutable<ILogicalOperator>>>();
+    private void prune() throws AlgebricksException {
+        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<>();
         for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
-            List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<Mutable<ILogicalOperator>>();
-            candidatesCopy.addAll(candidates);
+            List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<>(candidates);
             previousEquivalenceClasses.add(candidatesCopy);
         }
         equivalenceClasses.clear();
         for (List<Mutable<ILogicalOperator>> candidates : previousEquivalenceClasses) {
             boolean[] reserved = new boolean[candidates.size()];
-            for (int i = 0; i < reserved.length; i++) {
-                reserved[i] = false;
-            }
             for (int i = candidates.size() - 1; i >= 0; i--) {
-                if (reserved[i] == false) {
-                    List<Mutable<ILogicalOperator>> equivalentClass = new ArrayList<Mutable<ILogicalOperator>>();
+                if (!reserved[i]) {
+                    List<Mutable<ILogicalOperator>> equivalentClass = new ArrayList<>();
                     ILogicalOperator candidate = candidates.get(i).getValue();
                     equivalentClass.add(candidates.get(i));
                     for (int j = i - 1; j >= 0; j--) {
@@ -502,7 +499,7 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
         boolean worthMaterialization = worthMaterialization(group.get(0));
         boolean requiresMaterialization;
         // get clusterIds for each candidate in the group
-        List<Integer> groupClusterIds = new ArrayList<Integer>(group.size());
+        List<Integer> groupClusterIds = new ArrayList<>(group.size());
         for (int i = 0; i < group.size(); i++) {
             groupClusterIds.add(clusterMap.get(group.get(i)).getValue());
         }