You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/19 18:44:35 UTC

hive git commit: HIVE-19009 : Retain and use runtime statistics during hs2 lifetime (Zoltan Haindrich via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 046bc646b -> 9f15e22f4


HIVE-19009 : Retain and use runtime statistics during hs2 lifetime (Zoltan Haindrich via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f15e22f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f15e22f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f15e22f

Branch: refs/heads/master
Commit: 9f15e22f4aea99891a37aa1e54d490921e6e1174
Parents: 046bc64
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Tue Apr 3 08:51:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Apr 19 11:44:04 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +-
 .../test/resources/testconfiguration.properties |   1 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |  10 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |  12 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   4 +
 .../hive/ql/optimizer/physical/Vectorizer.java  |  12 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |   2 +-
 .../hive/ql/plan/mapper/CachingStatsSource.java |  68 +++++++++
 .../hive/ql/plan/mapper/EmptyStatsSource.java   |  11 ++
 .../plan/mapper/SimpleRuntimeStatsSource.java   |   6 +
 .../hadoop/hive/ql/plan/mapper/StatsSource.java |   5 +-
 .../hive/ql/plan/mapper/StatsSources.java       | 122 ++++++++++++++++
 .../hive/ql/reexec/IReExecutionPlugin.java      |   1 +
 .../hadoop/hive/ql/reexec/ReExecDriver.java     |  20 ++-
 .../ql/reexec/ReExecutionOverlayPlugin.java     |   4 +
 .../hadoop/hive/ql/reexec/ReOptimizePlugin.java |  48 +++++--
 .../signature/TestOperatorSignature.java        |   9 +-
 .../ql/plan/mapping/TestCounterMapping.java     |   1 -
 .../queries/clientpositive/runtime_stats_hs2.q  |  22 +++
 .../clientpositive/llap/runtime_stats_hs2.q.out | 141 +++++++++++++++++++
 20 files changed, 479 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 73492ff..536c7b4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4263,10 +4263,19 @@ public class HiveConf extends Configuration {
         "comma separated list of plugin can be used:\n"
             + "  overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n"
             + "  reoptimize: collects operator statistics during execution and recompile the query after a failure"),
+    HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "query",
+        new StringSet("query", "hiveserver", "metastore"),
+        "Sets the persistence scope of runtime statistics\n"
+            + "  query: runtime statistics are only used during re-execution\n"
+            + "  hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"),
+
     HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1,
         "Maximum number of re-executions for a single query."),
     HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false,
-        "Used during testing"),
+        "If sessionstats are enabled; this option can be used to collect statistics all the time"),
+    HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000,
+        "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"),
+
 
     HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
         "If the query results cache is enabled. This will keep results of previously executed queries " +

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index d26f0cc..4e7c519 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -480,6 +480,7 @@ minillaplocal.query.files=\
   retry_failure.q,\
   retry_failure_stat_changes.q,\
   retry_failure_oom.q,\
+  runtime_stats_hs2.q,\
   bucketsortoptimize_insert_2.q,\
   check_constraint.q,\
   cbo_gby.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index fad8c0f..88022be 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.hive.ql.parse.ParseDriver;
 import org.apache.hadoop.hive.ql.parse.ParseException;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -208,7 +209,7 @@ public class QTestUtil {
     getSrcTables().add(table);
     storeSrcTables();
   }
-  
+
   public static Set<String> initSrcTables() {
     if (srcTables == null){
       initSrcTablesFromSystemProperty();
@@ -1066,13 +1067,14 @@ public class QTestUtil {
     clearTablesCreatedDuringTests();
     clearUDFsCreatedDuringTests();
     clearKeysCreatedInTests();
+    StatsSources.clearAllStats();
   }
-  
+
   protected void clearSettingsCreatedInTests() throws IOException {
     getCliDriver().processLine(String.format("set hive.security.authorization.enabled=false;"));
     getCliDriver().processLine(String.format("set user.name=%s;",
         System.getProperty(TEST_HIVE_USER_PROPERTY, "hive_test_user")));
-    
+
     getCliDriver().processLine("set hive.metastore.partition.name.whitelist.pattern=;");
     getCliDriver().processLine("set hive.test.mode=false;");
     getCliDriver().processLine("set hive.mapred.mode=nonstrict;");
@@ -1098,7 +1100,7 @@ public class QTestUtil {
     clearTablesCreatedDuringTests();
     clearUDFsCreatedDuringTests();
     clearKeysCreatedInTests();
-    
+
     cleanupFromFile();
 
     // delete any contents in the warehouse dir

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9ca8b00..70846ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -159,7 +159,7 @@ public class Context {
 
   private boolean isExplainPlan = false;
   private PlanMapper planMapper = new PlanMapper();
-  private StatsSource runtimeStatsSource;
+  private StatsSource statsSource;
   private int executionIndex;
 
   public void setOperation(Operation operation) {
@@ -1047,16 +1047,16 @@ public class Context {
     return planMapper;
   }
 
-  public void setStatsSource(StatsSource runtimeStatsSource) {
-    this.runtimeStatsSource = runtimeStatsSource;
+  public void setStatsSource(StatsSource statsSource) {
+    this.statsSource = statsSource;
   }
 
   public StatsSource getStatsSource() {
-    if (runtimeStatsSource != null) {
-      return runtimeStatsSource;
+    if (statsSource != null) {
+      return statsSource;
     } else {
       // hierarchical; add def stats also here
-      return new EmptyStatsSource();
+      return EmptyStatsSource.INSTANCE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f761fff..9cb2ff1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2748,6 +2748,10 @@ public class Driver implements IDriver {
     this.statsSource = runtimeStatsSource;
   }
 
+  public StatsSource getStatsSource() {
+    return statsSource;
+  }
+
   @Override
   public boolean hasResultSet() {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e15c5b7..068f25e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
@@ -375,6 +376,8 @@ public class Vectorizer implements PhysicalPlanResolver {
   private Set<VirtualColumn> availableVectorizedVirtualColumnSet = null;
   private Set<VirtualColumn> neededVirtualColumnSet = null;
 
+  private PlanMapper planMapper;
+
   public class VectorizerCannotVectorizeException extends Exception {
   }
 
@@ -867,7 +870,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   private void runDelayedFixups() {
-    for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed 
+    for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed
         : delayedFixups.entrySet()) {
       Operator<? extends OperatorDesc> key = delayed.getKey();
       Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>> value =
@@ -1470,7 +1473,7 @@ public class Vectorizer implements PhysicalPlanResolver {
           enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
         }
       }
- 
+
       return false;
     }
 
@@ -2247,6 +2250,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
 
     hiveConf = physicalContext.getConf();
+    planMapper = physicalContext.getContext().getPlanMapper();
 
     String vectorizationEnabledOverrideString =
         HiveConf.getVar(hiveConf,
@@ -2776,7 +2780,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       }
       if (exprNodeDescList != null) {
         ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0);
-   
+
         if (containsLeadLag(exprNodeDesc)) {
           setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName);
           return false;
@@ -5019,6 +5023,8 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     LOG.debug("vectorizeOperator " + vectorOp.getClass().getName());
     LOG.debug("vectorizeOperator " + vectorOp.getConf().getClass().getName());
+    // These operators need to be linked to enable runtime statistics to be gathered/used correctly
+    planMapper.link(op, vectorOp);
 
     return vectorOp;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 5b7f4c3..e7ca7f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -556,10 +556,10 @@ public class JoinDesc extends AbstractOperatorDesc {
   }
 
   protected Map<Integer, String> toCompactString(int[][] filterMap) {
+    filterMap = compactFilter(filterMap);
     if (filterMap == null) {
       return null;
     }
-    filterMap = compactFilter(filterMap);
     Map<Integer, String> result = new LinkedHashMap<Integer, String>();
     for (int i = 0 ; i < filterMap.length; i++) {
       if (filterMap[i] == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
new file mode 100644
index 0000000..c515276
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+public class CachingStatsSource implements StatsSource {
+
+
+  private final Cache<OpTreeSignature, OperatorStats> cache;
+
+  public CachingStatsSource(HiveConf conf) {
+    int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
+    cache = CacheBuilder.newBuilder().maximumSize(size).build();
+  }
+
+  public void put(OpTreeSignature sig, OperatorStats opStat) {
+    cache.put(sig, opStat);
+  }
+
+  @Override
+  public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+    return Optional.ofNullable(cache.getIfPresent(treeSig));
+  }
+
+  @Override
+  public boolean canProvideStatsFor(Class<?> clazz) {
+    if (Operator.class.isAssignableFrom(clazz)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+    for (Entry<OpTreeSignature, OperatorStats> entry : map.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
index 72092ce..19df13a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan.mapper;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
@@ -25,6 +26,11 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats;
 
 public class EmptyStatsSource implements StatsSource {
 
+  public static StatsSource INSTANCE = new EmptyStatsSource();
+
+  private EmptyStatsSource() {
+  }
+
   @Override
   public boolean canProvideStatsFor(Class<?> class1) {
     return false;
@@ -35,4 +41,9 @@ public class EmptyStatsSource implements StatsSource {
     return Optional.empty();
   }
 
+  @Override
+  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+    throw new RuntimeException("This is an empty source!");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
index b5a3c24..3d6c257 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.plan.mapper;
 
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 
@@ -56,4 +57,9 @@ public class SimpleRuntimeStatsSource implements StatsSource {
     return false;
   }
 
+  @Override
+  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+    throw new RuntimeException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
index df5aa0c..e8d51c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan.mapper;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
@@ -25,8 +26,10 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats;
 
 public interface StatsSource {
 
-  boolean canProvideStatsFor(Class<?> class1);
+  boolean canProvideStatsFor(Class<?> clazz);
 
   Optional<OperatorStats> lookup(OpTreeSignature treeSig);
 
+  void putAll(Map<OpTreeSignature, OperatorStats> map);
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
new file mode 100644
index 0000000..a4e33c3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class StatsSources {
+
+  public static class MapBackedStatsSource implements StatsSource {
+
+    private Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+
+    @Override
+    public boolean canProvideStatsFor(Class<?> clazz) {
+      if (Operator.class.isAssignableFrom(clazz)) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+      return Optional.ofNullable(map.get(treeSig));
+    }
+
+    @Override
+    public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+      map.putAll(map);
+    }
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
+
+  public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) {
+    if (currentStatsSource instanceof CachingStatsSource) {
+      CachingStatsSource sessionStatsSource = (CachingStatsSource) currentStatsSource;
+      loadFromPlanMapper(sessionStatsSource, pm);
+      return sessionStatsSource;
+    } else {
+      return new SimpleRuntimeStatsSource(pm);
+    }
+  }
+
+  public static void loadFromPlanMapper(CachingStatsSource sessionStatsSource, PlanMapper pm) {
+    Map<OpTreeSignature, OperatorStats> map = extractStatMapFromPlanMapper(pm);
+    sessionStatsSource.putAll(map);
+  }
+
+
+  private static Map<OpTreeSignature, OperatorStats> extractStatMapFromPlanMapper(PlanMapper pm) {
+    Map<OpTreeSignature, OperatorStats> map = new HashMap<OpTreeSignature, OperatorStats>();
+    Iterator<EquivGroup> it = pm.iterateGroups();
+    while (it.hasNext()) {
+      EquivGroup e = it.next();
+      List<OperatorStats> stat = e.getAll(OperatorStats.class);
+      List<OpTreeSignature> sig = e.getAll(OpTreeSignature.class);
+
+      if (stat.size() > 1 || sig.size() > 1) {
+        StringBuffer sb = new StringBuffer();
+        sb.append(String.format("expected(stat-sig) 1-1, got {}-{} ;", stat.size(), sig.size()));
+        for (OperatorStats s : stat) {
+          sb.append(s);
+          sb.append(";");
+        }
+        for (OpTreeSignature s : sig) {
+          sb.append(s);
+          sb.append(";");
+        }
+        LOG.debug(sb.toString());
+      }
+      if (stat.size() >= 1 && sig.size() >= 1) {
+        map.put(sig.get(0), stat.get(0));
+      }
+    }
+    return map;
+  }
+
+  private static StatsSource globalStatsSource;
+
+  public static StatsSource globalStatsSource(HiveConf conf) {
+    if (globalStatsSource == null) {
+      globalStatsSource = new CachingStatsSource(conf);
+    }
+    return globalStatsSource;
+  }
+
+  @VisibleForTesting
+  public static void clearAllStats() {
+    globalStatsSource = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
index 2b0d23c..be62fc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
@@ -59,6 +59,7 @@ public interface IReExecutionPlugin {
    */
   boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper);
 
+  void afterExecute(PlanMapper planMapper, boolean successfull);
 
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
index 8a5595d..501f0b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -156,6 +156,9 @@ public class ReExecDriver implements IDriver {
       LOG.info("Execution #{} of query", executionIndex);
       CommandProcessorResponse cpr = coreDriver.run();
 
+      PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
+      afterExecute(oldPlanMapper, cpr.getResponseCode() == 0);
+
       boolean shouldReExecute = explainReOptimization && executionIndex==1;
       shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
 
@@ -164,25 +167,34 @@ public class ReExecDriver implements IDriver {
       }
       LOG.info("Preparing to re-execute query");
       prepareToReExecute();
-      PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
       CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery);
       if (compile_resp.failed()) {
+        LOG.error("Recompilation of the query failed; this is unexpected.");
         // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before?
         return compile_resp;
       }
 
       PlanMapper newPlanMapper = coreDriver.getPlanMapper();
       if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
+        LOG.info("re-running the query would probably not yield better results; returning with last error");
         // FIXME: retain old error; or create a new one?
         return cpr;
       }
     }
   }
 
+  private void afterExecute(PlanMapper planMapper, boolean success) {
+    for (IReExecutionPlugin p : plugins) {
+      p.afterExecute(planMapper, success);
+    }
+  }
+
   private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
     boolean ret = false;
     for (IReExecutionPlugin p : plugins) {
-      ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+      boolean shouldReExecute = p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+      LOG.debug("{}.shouldReExecuteAfterCompile = {}", p, shouldReExecute);
+      ret |= shouldReExecute;
     }
     return ret;
   }
@@ -190,7 +202,9 @@ public class ReExecDriver implements IDriver {
   private boolean shouldReExecute() {
     boolean ret = false;
     for (IReExecutionPlugin p : plugins) {
-      ret |= p.shouldReExecute(executionIndex);
+      boolean shouldReExecute = p.shouldReExecute(executionIndex);
+      LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute);
+      ret |= shouldReExecute;
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
index 4ee3c14..950903c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
@@ -80,4 +80,8 @@ public class ReExecutionOverlayPlugin implements IReExecutionPlugin {
   public void beforeExecute(int executionIndex, boolean explainReOptimization) {
   }
 
+  @Override
+  public void afterExecute(PlanMapper planMapper, boolean success) {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
index f731315..409cc73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.reexec;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -29,7 +30,8 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
 import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +49,8 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
 
   private OperatorStatsReaderHook statsReaderHook;
 
+  private boolean alwaysCollectStats;
+
   class LocalHook implements ExecuteWithHookContext {
 
     @Override
@@ -62,10 +66,10 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
               if (message.contains("Vertex failed,") && isOOM) {
                 retryPossible = true;
               }
-              System.out.println(exception);
             }
           }
         }
+        LOG.info("ReOptimization: retryPossible: {}", retryPossible);
       }
     }
   }
@@ -77,9 +81,25 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
     statsReaderHook = new OperatorStatsReaderHook();
     coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
     coreDriver.getHookRunner().addPostHook(statsReaderHook);
-    //    statsReaderHook.setCollectOnSuccess(true);
-    statsReaderHook.setCollectOnSuccess(
-        driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS));
+    alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
+    statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
+
+    coreDriver.setStatsSource(getStatsSource(driver.getConf()));
+  }
+
+  static enum StatsSourceMode {
+    query, hiveserver;
+  }
+
+  private StatsSource getStatsSource(HiveConf conf) {
+    StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE));
+    switch (mode) {
+    case query:
+      return new StatsSources.MapBackedStatsSource();
+    case hiveserver:
+      return StatsSources.globalStatsSource(conf);
+    }
+    throw new RuntimeException("Unknown StatsSource setting: " + mode);
   }
 
   @Override
@@ -90,17 +110,19 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
   @Override
   public void prepareToReExecute() {
     statsReaderHook.setCollectOnSuccess(true);
-    PlanMapper pm = coreDriver.getContext().getPlanMapper();
-    coreDriver.setStatsSource(new SimpleRuntimeStatsSource(pm));
     retryPossible = false;
+    coreDriver.setStatsSource(
+        StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), coreDriver.getPlanMapper()));
   }
 
   @Override
   public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
-    return planDidChange(oldPlanMapper, newPlanMapper);
+    boolean planDidChange = !planEquals(oldPlanMapper, newPlanMapper);
+    LOG.info("planDidChange: {}", planDidChange);
+    return planDidChange;
   }
 
-  private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) {
+  private boolean planEquals(PlanMapper pmL, PlanMapper pmR) {
     List<Operator> opsL = getRootOps(pmL);
     List<Operator> opsR = getRootOps(pmR);
     for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) {
@@ -135,4 +157,12 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
     }
   }
 
+  @Override
+  public void afterExecute(PlanMapper planMapper, boolean success) {
+    if (alwaysCollectStats) {
+      coreDriver.setStatsSource(
+          StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), planMapper));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
index 0afc533..b09aafb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
@@ -77,12 +77,16 @@ public class TestOperatorSignature {
     Operator<TableScanDesc> ts = getTsOp(i);
     Operator<? extends OperatorDesc> fil = getFilterOp(j);
 
-    ts.getChildOperators().add(fil);
-    fil.getParentOperators().add(ts);
+    connectOperators(ts, fil);
 
     return fil;
   }
 
+  private void connectOperators(Operator<?> parent, Operator<?> child) {
+    parent.getChildOperators().add(child);
+    child.getParentOperators().add(parent);
+  }
+
   @Test
   public void testTableScand() {
     Operator<TableScanDesc> t1 = getTsOp(3);
@@ -157,4 +161,5 @@ public class TestOperatorSignature {
   }
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
index 18aeb33..8126970 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
@@ -172,7 +172,6 @@ public class TestCounterMapping {
   }
 
   private static IDriver createDriver() {
-    //    HiveConf conf = new HiveConf(Driver.class);
     HiveConf conf = env_setup.getTestCtx().hiveConf;
     conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED, true);
     conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/runtime_stats_hs2.q b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
new file mode 100644
index 0000000..34a8dd3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
@@ -0,0 +1,22 @@
+
+create table tx(a int,u int);
+insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10);
+
+create table px(a int,p int);
+insert into px values (2,2),(3,3),(5,5),(7,7),(11,11);
+
+set hive.explain.user=true;
+set hive.query.reexecution.enabled=true;
+set hive.query.reexecution.always.collect.operator.stats=true;
+set hive.query.reexecution.strategies=overlay,reoptimize;
+set hive.query.reexecution.stats.persist.scope=hiveserver;
+
+-- join output estimate is underestimated: 1 row
+explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;
+
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;
+
+-- join output estimate is 3 rows ; all the operators stats are "runtime"
+explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;

http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
new file mode 100644
index 0000000..4d60b8c
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
@@ -0,0 +1,141 @@
+PREHOOK: query: create table tx(a int,u int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tx
+POSTHOOK: query: create table tx(a int,u int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tx
+PREHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tx
+POSTHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tx
+POSTHOOK: Lineage: tx.a SCRIPT []
+POSTHOOK: Lineage: tx.u SCRIPT []
+PREHOOK: query: create table px(a int,p int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@px
+POSTHOOK: query: create table px(a int,p int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@px
+PREHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@px
+POSTHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@px
+POSTHOOK: Lineage: px.a SCRIPT []
+POSTHOOK: Lineage: px.p SCRIPT []
+PREHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+
+Stage-0
+  Fetch Operator
+    limit:-1
+    Stage-1
+      Reducer 3 llap
+      File Output Operator [FS_15]
+        Group By Operator [GBY_13] (rows=1 width=8)
+          Output:["_col0"],aggregations:["sum(VALUE._col0)"]
+        <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap
+          PARTITION_ONLY_SHUFFLE [RS_12]
+            Group By Operator [GBY_11] (rows=1 width=8)
+              Output:["_col0"],aggregations:["sum(_col0)"]
+              Select Operator [SEL_9] (rows=1 width=8)
+                Output:["_col0"]
+                Merge Join Operator [MERGEJOIN_20] (rows=1 width=8)
+                  Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"]
+                <-Map 1 [SIMPLE_EDGE] llap
+                  SHUFFLE [RS_6]
+                    PartitionCols:_col0
+                    Select Operator [SEL_2] (rows=1 width=4)
+                      Output:["_col0"]
+                      Filter Operator [FIL_18] (rows=1 width=4)
+                        predicate:((u < 10) and (u > 2))
+                        TableScan [TS_0] (rows=8 width=4)
+                          default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"]
+                <-Map 4 [SIMPLE_EDGE] llap
+                  SHUFFLE [RS_7]
+                    PartitionCols:_col0
+                    Select Operator [SEL_5] (rows=1 width=4)
+                      Output:["_col0"]
+                      Filter Operator [FIL_19] (rows=1 width=4)
+                        predicate:((p < 10) and (p > 2))
+                        TableScan [TS_3] (rows=5 width=4)
+                          default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"]
+
+PREHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@px
+PREHOOK: Input: default@tx
+#### A masked pattern was here ####
+POSTHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@px
+POSTHOOK: Input: default@tx
+#### A masked pattern was here ####
+83
+PREHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+
+Stage-0
+  Fetch Operator
+    limit:-1
+    Stage-1
+      Reducer 3 llap
+      File Output Operator [FS_15]
+        Group By Operator [GBY_13] (runtime: rows=1 width=8)
+          Output:["_col0"],aggregations:["sum(VALUE._col0)"]
+        <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap
+          PARTITION_ONLY_SHUFFLE [RS_12]
+            Group By Operator [GBY_11] (runtime: rows=1 width=8)
+              Output:["_col0"],aggregations:["sum(_col0)"]
+              Select Operator [SEL_9] (runtime: rows=3 width=8)
+                Output:["_col0"]
+                Merge Join Operator [MERGEJOIN_20] (runtime: rows=3 width=8)
+                  Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"]
+                <-Map 1 [SIMPLE_EDGE] llap
+                  SHUFFLE [RS_6]
+                    PartitionCols:_col0
+                    Select Operator [SEL_2] (runtime: rows=5 width=4)
+                      Output:["_col0"]
+                      Filter Operator [FIL_18] (runtime: rows=5 width=4)
+                        predicate:((u < 10) and (u > 2))
+                        TableScan [TS_0] (runtime: rows=8 width=4)
+                          default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"]
+                <-Map 4 [SIMPLE_EDGE] llap
+                  SHUFFLE [RS_7]
+                    PartitionCols:_col0
+                    Select Operator [SEL_5] (runtime: rows=3 width=4)
+                      Output:["_col0"]
+                      Filter Operator [FIL_19] (runtime: rows=3 width=4)
+                        predicate:((p < 10) and (p > 2))
+                        TableScan [TS_3] (runtime: rows=5 width=4)
+                          default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"]
+