You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2018/04/24 05:36:15 UTC

[6/6] hive git commit: HIVE-19171: Persist runtime statistics in metastore (Zoltan Haindrich reviewed by Ashutosh Chauhan)

HIVE-19171: Persist runtime statistics in metastore (Zoltan Haindrich reviewed by Ashutosh Chauhan)

Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/56c3a957
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/56c3a957
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/56c3a957

Branch: refs/heads/master
Commit: 56c3a95703b9dbb54a0d19b08b4e4a31664b8115
Parents: f552e74
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Tue Apr 24 07:28:50 2018 +0200
Committer: Zoltan Haindrich <ki...@rxd.hu>
Committed: Tue Apr 24 07:28:50 2018 +0200

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    6 +-
 .../listener/DummyRawStoreFailEvent.java        |   19 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |    2 +-
 .../upgrade/derby/056-HIVE-19171.derby.sql      |   10 +
 .../ql/optimizer/signature/OpSignature.java     |   19 +-
 .../ql/optimizer/signature/OpTreeSignature.java |   24 +-
 .../signature/OpTreeSignatureFactory.java       |   12 +-
 .../ql/optimizer/signature/RuntimeStatsMap.java |   83 +
 .../signature/RuntimeStatsPersister.java        |   54 +
 .../ql/optimizer/signature/SignatureUtils.java  |   22 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |    7 +-
 .../hadoop/hive/ql/plan/HashTableSinkDesc.java  |    6 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |    6 +-
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java |    6 +-
 .../hive/ql/plan/mapper/CachingStatsSource.java |    7 +-
 .../hive/ql/plan/mapper/EmptyStatsSource.java   |    2 +-
 .../ql/plan/mapper/MapBackedStatsSource.java    |   50 +
 .../ql/plan/mapper/MetastoreStatsConnector.java |  143 +
 .../hadoop/hive/ql/plan/mapper/PlanMapper.java  |  108 +-
 .../plan/mapper/SimpleRuntimeStatsSource.java   |   65 -
 .../hive/ql/plan/mapper/StatsSources.java       |   86 +-
 .../hadoop/hive/ql/reexec/ReOptimizePlugin.java |   17 +-
 .../hadoop/hive/ql/stats/OperatorStats.java     |   33 +-
 .../signature/TestRuntimeStatsPersistence.java  |  165 +
 .../ql/plan/mapping/TestCounterMapping.java     |    7 +-
 .../ql/plan/mapping/TestReOptimization.java     |   85 +-
 .../apache/hive/service/server/HiveServer2.java |    3 +
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 5376 ++++++++++--------
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  259 +
 .../ThriftHiveMetastore_server.skeleton.cpp     |   10 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  376 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   97 +
 .../metastore/api/GetRuntimeStatsRequest.java   |  283 +
 .../hadoop/hive/metastore/api/RuntimeStat.java  |  600 ++
 .../hive/metastore/api/ThriftHiveMetastore.java | 2584 +++++++--
 .../gen-php/metastore/ThriftHiveMetastore.php   |  481 ++
 .../src/gen/thrift/gen-php/metastore/Types.php  |  171 +
 .../hive_metastore/ThriftHiveMetastore-remote   |   14 +
 .../hive_metastore/ThriftHiveMetastore.py       |  409 ++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  141 +
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   37 +
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |  119 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |  113 +-
 .../hive/metastore/HiveMetaStoreClient.java     |  140 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |    9 +
 .../hadoop/hive/metastore/ObjectStore.java      |  248 +-
 .../apache/hadoop/hive/metastore/RawStore.java  |   15 +-
 .../hive/metastore/RuntimeStatsCleanerTask.java |   67 +
 .../hive/metastore/cache/CachedStore.java       |   21 +-
 .../hive/metastore/conf/MetastoreConf.java      |   34 +-
 .../hive/metastore/model/MRuntimeStat.java      |   59 +
 .../src/main/resources/package.jdo              |   14 +
 .../main/sql/derby/hive-schema-3.0.0.derby.sql  |   10 +
 .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql  |   10 +
 .../main/sql/mssql/hive-schema-3.0.0.mssql.sql  |    9 +
 .../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql  |    9 +
 .../main/sql/mysql/hive-schema-3.0.0.mysql.sql  |   10 +
 .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql  |    9 +
 .../sql/oracle/hive-schema-3.0.0.oracle.sql     |   10 +
 .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql    |    9 +
 .../sql/postgres/hive-schema-3.0.0.postgres.sql |   11 +
 .../upgrade-2.3.0-to-3.0.0.postgres.sql         |    9 +
 .../src/main/thrift/hive_metastore.thrift       |   12 +
 .../DummyRawStoreControlledCommit.java          |   18 +
 .../DummyRawStoreForJdoConnection.java          |   17 +
 .../HiveMetaStoreClientPreCatalog.java          |   10 +
 .../hive/metastore/client/TestRuntimeStats.java |  100 +
 67 files changed, 9975 insertions(+), 3002 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2403d7a..f40c606 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4270,15 +4270,15 @@ public class HiveConf extends Configuration {
         new StringSet("query", "hiveserver", "metastore"),
         "Sets the persistence scope of runtime statistics\n"
             + "  query: runtime statistics are only used during re-execution\n"
-            + "  hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"),
+            + "  hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it\n"
+            + "  metastore: runtime statistics are persisted in the metastore as well"),
 
     HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1,
         "Maximum number of re-executions for a single query."),
     HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false,
         "If sessionstats are enabled; this option can be used to collect statistics all the time"),
     HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000,
-        "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"),
-
+        "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100. See also: runtime.stats.max.entries"),
 
     HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
         "If the query results cache is enabled. This will keep results of previously executed queries " +

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 801de7a..8ecbaad 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
 import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
@@ -295,6 +296,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
     objectStore.updateCreationMetadata(catName, dbname, tablename, cm);
   }
 
+  @Override
   public void alterTable(String catName, String dbName, String name, Table newTable)
       throws InvalidObjectException, MetaException {
     if (shouldEventSucceed) {
@@ -1126,6 +1128,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
     return null;
   }
 
+  @Override
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
       NoSuchObjectException {
     objectStore.createISchema(schema);
@@ -1195,4 +1198,20 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
     objectStore.addSerde(serde);
   }
+
+  @Override
+  public void addRuntimeStat(RuntimeStat stat) throws MetaException {
+    objectStore.addRuntimeStat(stat);
+  }
+
+  @Override
+  public List<RuntimeStat> getRuntimeStats() throws MetaException {
+    return objectStore.getRuntimeStats();
+  }
+
+  @Override
+  public int deleteRuntimeStats(int maxRetained, int maxRetainSecs) {
+    return 0;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 88022be..750fc69 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1067,7 +1067,7 @@ public class QTestUtil {
     clearTablesCreatedDuringTests();
     clearUDFsCreatedDuringTests();
     clearKeysCreatedInTests();
-    StatsSources.clearAllStats();
+    StatsSources.clearGlobalStats();
   }
 
   protected void clearSettingsCreatedInTests() throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql b/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
new file mode 100644
index 0000000..ef6c77b
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
@@ -0,0 +1,10 @@
+
+
+CREATE TABLE "APP"."RUNTIME_STATS" (
+  "RS_ID" bigint primary key,
+  "CREATE_TIME" integer not null,
+  "WEIGHT" integer not null,
+  "PAYLOAD" BLOB
+);
+
+CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
index e87bbce..f626bd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
@@ -25,18 +25,28 @@ import java.util.Map.Entry;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
+import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Signature of the operator(non-recursive).
  */
-public class OpSignature {
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
+public final class OpSignature {
 
   /**
    * Holds the signature of the operator; the keys are are the methods name marked by {@link Signature}.
    */
+  @JsonProperty
   private Map<String, Object> sigMap;
 
+  // need this for Jackson to work
+  @SuppressWarnings("unused")
+  private OpSignature() {
+  }
+
   private OpSignature(Operator<? extends OperatorDesc> op) {
     sigMap = new HashMap<>();
     // FIXME: consider other operator info as well..not just conf?
@@ -70,7 +80,7 @@ public class OpSignature {
 
   @VisibleForTesting
   public void proveEquals(OpSignature other) {
-    proveEquals(sigMap,other.sigMap);
+    proveEquals(sigMap, other.sigMap);
   }
 
   private static void proveEquals(Map<String, Object> m1, Map<String, Object> m2) {
@@ -103,4 +113,9 @@ public class OpSignature {
     }
     return sb.toString();
   }
+
+  public Map<String, Object> getSigMap() {
+    return sigMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
index c3dc848..f774158 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
@@ -24,14 +24,28 @@ import java.util.Objects;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
+import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
+
 /**
  * Operator tree signature.
  */
-public class OpTreeSignature {
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
+public final class OpTreeSignature {
+
+  @JsonProperty
   private int hashCode;
+  @JsonProperty
   private OpSignature sig;
+  @JsonProperty
   private ArrayList<OpTreeSignature> parentSig;
 
+  // need this for Jackson to work
+  @SuppressWarnings("unused")
+  private OpTreeSignature() {
+  }
+
   OpTreeSignature(Operator<?> op, OpTreeSignatureFactory osf) {
     sig = OpSignature.of(op);
     parentSig = new ArrayList<>();
@@ -82,4 +96,12 @@ public class OpTreeSignature {
     return sb.toString();
   }
 
+  public OpSignature getSig() {
+    return sig;
+  }
+
+  public ArrayList<OpTreeSignature> getParentSig() {
+    return parentSig;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
index 3df5ee9..80a3edf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
@@ -29,22 +29,22 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  */
 public interface OpTreeSignatureFactory {
 
-  public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op);
+  OpTreeSignature getSignature(Operator<? extends OperatorDesc> op);
 
-  static final OpTreeSignatureFactory DIRECT = new Direct();
+  OpTreeSignatureFactory DIRECT = new Direct();
 
-  public static OpTreeSignatureFactory direct() {
+  static OpTreeSignatureFactory direct() {
     return DIRECT;
   }
 
-  public static OpTreeSignatureFactory newCache() {
+  static OpTreeSignatureFactory newCache() {
     return new CachedFactory();
   }
 
   // FIXME: possible alternative: move both OpSignature/OpTreeSignature into
   // under some class as nested ones; and that way this factory level caching can be made "transparent"
 
-  static class Direct implements OpTreeSignatureFactory {
+  class Direct implements OpTreeSignatureFactory {
 
     @Override
     public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op) {
@@ -53,7 +53,7 @@ public interface OpTreeSignatureFactory {
 
   }
 
-  static class CachedFactory implements OpTreeSignatureFactory {
+  class CachedFactory implements OpTreeSignatureFactory {
 
     Map<Operator<? extends OperatorDesc>, OpTreeSignature> cache = new IdentityHashMap<>();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
new file mode 100644
index 0000000..195a8b1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+
+/**
+ * This class makes it easier for jackson to comprehend the map type
+ *
+ * Instead of getting into convincing Jackson to store the map with serializers and typefactory tricks;
+ * this class is a simple "repacker" to and from list.
+ */
+public final class RuntimeStatsMap {
+  @JsonProperty
+  private List<OpTreeSignature> sigs;
+  @JsonProperty
+  private List<OperatorStats> ss;
+
+  RuntimeStatsMap() {
+  }
+
+
+  public RuntimeStatsMap(Map<OpTreeSignature, OperatorStats> input) {
+    sigs = new ArrayList<>(input.size());
+    ss = new ArrayList<>(input.size());
+    for (Entry<OpTreeSignature, OperatorStats> ent : input.entrySet()) {
+      sigs.add(ent.getKey());
+      ss.add(ent.getValue());
+    }
+  }
+
+  public Map<OpTreeSignature, OperatorStats> toMap() throws IOException {
+    if (sigs.size() != ss.size()) {
+      throw new IOException("constraint validation");
+    }
+    Map<OpTreeSignature, OperatorStats> ret = new HashMap<>();
+    for (int i = 0; i < sigs.size(); i++) {
+      ret.put(sigs.get(i), ss.get(i));
+    }
+    return ret;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(sigs, ss);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || obj.getClass() != RuntimeStatsMap.class) {
+      return false;
+    }
+    RuntimeStatsMap o = (RuntimeStatsMap) obj;
+    return Objects.equal(sigs, o.sigs) &&
+        Objects.equal(ss, o.ss);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
new file mode 100644
index 0000000..696fe1f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * Enables to encode/decode runtime statistics values into textual form.
+ */
+public class RuntimeStatsPersister {
+  public static final RuntimeStatsPersister INSTANCE = new RuntimeStatsPersister();
+
+  private final ObjectMapper om;
+
+  RuntimeStatsPersister() {
+    om = new ObjectMapper();
+    om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+    om.configure(SerializationFeature.INDENT_OUTPUT, true);
+    om.configure(MapperFeature.REQUIRE_SETTERS_FOR_GETTERS, true);
+  }
+
+  public <T> String encode(T input) throws IOException {
+    return om.writeValueAsString(input);
+  }
+
+  public <T> T decode(String input, Class<T> clazz) throws IOException {
+    return om.readValue(input, clazz);
+  }
+
+  public <T> T decode(byte[] input, Class<T> clazz) throws IOException {
+    return om.readValue(input, clazz);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
index 4f3e338..f599d33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
@@ -23,17 +23,15 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
 
 /**
  * Enables to calculate the signature of an object.
  *
  * If the object has methods annotated with {@link Signature}, they will be used.
- * If the object has no methods marked with the annotation; the object itself is used in the signature to prevent incorrect matches.
+ * If the object has no methods marked with the annotation;
+ * the object itself is used in the signature to prevent incorrect matches.
  */
-public class SignatureUtils {
+public final class SignatureUtils {
 
   private static Map<Class<?>, SignatureMapper> mappers = new HashMap<>();
 
@@ -42,28 +40,24 @@ public class SignatureUtils {
     mapper.write(ret, o);
   }
 
-  static class SignatureMapper {
+  /** Prevent construction. */
+  private SignatureUtils() {
+  }
 
-    static final Set<String> acceptedSignatureTypes = Sets.newHashSet();
+  static class SignatureMapper {
 
     private List<Method> sigMethods;
 
     private String classLabel;
 
-    public SignatureMapper(Class<?> o) {
+    SignatureMapper(Class<?> o) {
       Method[] f = o.getMethods();
       sigMethods = new ArrayList<>();
       for (Method method : f) {
         if (method.isAnnotationPresent(Signature.class)) {
-          Class<?> rType = method.getReturnType();
-          String rTypeName = rType.getName();
-          if (!rType.isPrimitive() && acceptedSignatureTypes.contains(rTypeName)) {
-            throw new RuntimeException("unxepected type (" + rTypeName + ") used in signature");
-          }
           sigMethods.add(method);
         }
       }
-
       classLabel = o.getName();
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index e15a49f..fcb6de7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -192,11 +192,15 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
   }
 
   @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
-  @Signature
   public Path getDirName() {
     return dirName;
   }
 
+  @Signature
+  public String getDirNameString() {
+    return dirName.toString();
+  }
+
   public void setDirName(final Path dirName) {
     this.dirName = dirName;
   }
@@ -216,7 +220,6 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-  @Signature
   public TableDesc getTableInfo() {
     return tableInfo;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
index a61a47e..d71ba5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
@@ -308,10 +308,10 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
    */
   @Override
   @Explain(displayName = "keys")
-  public Map<Byte, String> getKeysString() {
-    Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+  public Map<String, String> getKeysString() {
+    Map<String, String> keyMap = new LinkedHashMap<>();
     for (Map.Entry<Byte, List<ExprNodeDesc>> k: getKeys().entrySet()) {
-      keyMap.put(k.getKey(), PlanUtils.getExprListString(k.getValue()));
+      keyMap.put(String.valueOf(k.getKey()), PlanUtils.getExprListString(k.getValue()));
     }
     return keyMap;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index e7ca7f6..95990b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -231,14 +231,14 @@ public class JoinDesc extends AbstractOperatorDesc {
    */
   @Explain(displayName = "keys")
   @Signature
-  public Map<Byte, String> getKeysString() {
+  public Map<String, String> getKeysString() {
     if (joinKeys == null) {
       return null;
     }
 
-    Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+    Map<String, String> keyMap = new LinkedHashMap<String, String>();
     for (byte i = 0; i < joinKeys.length; i++) {
-      keyMap.put(i, PlanUtils.getExprListString(Arrays.asList(joinKeys[i])));
+      keyMap.put(String.valueOf(i), PlanUtils.getExprListString(Arrays.asList(joinKeys[i])));
     }
     return keyMap;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index 54b705d..dc4f085 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -213,10 +213,10 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
    */
   @Override
   @Explain(displayName = "keys")
-  public Map<Byte, String> getKeysString() {
-    Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+  public Map<String, String> getKeysString() {
+    Map<String, String> keyMap = new LinkedHashMap<>();
     for (Map.Entry<Byte, List<ExprNodeDesc>> k: getKeys().entrySet()) {
-      keyMap.put(k.getKey(), PlanUtils.getExprListString(k.getValue()));
+      keyMap.put(String.valueOf(k.getKey()), PlanUtils.getExprListString(k.getValue()));
     }
     return keyMap;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
index c515276..2841638 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
@@ -22,8 +22,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
@@ -36,9 +34,8 @@ public class CachingStatsSource implements StatsSource {
 
   private final Cache<OpTreeSignature, OperatorStats> cache;
 
-  public CachingStatsSource(HiveConf conf) {
-    int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
-    cache = CacheBuilder.newBuilder().maximumSize(size).build();
+  public CachingStatsSource(int cacheSize) {
+    cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
   }
 
   public void put(OpTreeSignature sig, OperatorStats opStat) {

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
index 19df13a..624f107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -24,7 +24,7 @@ import java.util.Optional;
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
 
-public class EmptyStatsSource implements StatsSource {
+public final class EmptyStatsSource implements StatsSource {
 
   public static StatsSource INSTANCE = new EmptyStatsSource();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MapBackedStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MapBackedStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MapBackedStatsSource.java
new file mode 100644
index 0000000..fb2b5f8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MapBackedStatsSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+public class MapBackedStatsSource implements StatsSource {
+
+  private Map<OpTreeSignature, OperatorStats> map = new ConcurrentHashMap<>();
+
+  @Override
+  public boolean canProvideStatsFor(Class<?> clazz) {
+    if (Operator.class.isAssignableFrom(clazz)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+    return Optional.ofNullable(map.get(treeSig));
+  }
+
+  @Override
+  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+    this.map.putAll(map);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
new file mode 100644
index 0000000..237c1cc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsMap;
+import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsPersister;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Decorates a StatSource to be loaded and persisted in the metastore as well.
+ */
+class MetastoreStatsConnector implements StatsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatsConnector.class);
+
+  private final StatsSource ss;
+
+  private ExecutorService executor;
+
+  MetastoreStatsConnector(StatsSource ss) {
+    this.ss = ss;
+    executor = Executors.newSingleThreadExecutor(
+        new BasicThreadFactory.Builder()
+            .namingPattern("Metastore-RuntimeStats-Loader-%d")
+            .daemon(true)
+            .build());
+
+    executor.submit(new RuntimeStatsLoader());
+  }
+
+  private class RuntimeStatsLoader implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        List<RuntimeStat> rs = Hive.get().getMSC().getRuntimeStats();
+        for (RuntimeStat thriftStat : rs) {
+          try {
+            ss.putAll(decode(thriftStat));
+          } catch (IOException e) {
+            logException("Exception while loading runtime stats", e);
+          }
+        }
+      } catch (TException | HiveException e) {
+        logException("Exception while reading metastore runtime stats", e);
+      }
+    }
+  }
+
+  @Override
+  public boolean canProvideStatsFor(Class<?> clazz) {
+    return ss.canProvideStatsFor(clazz);
+  }
+
+  @Override
+  public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+    return ss.lookup(treeSig);
+  }
+
+  @Override
+  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+    ss.putAll(map);
+    executor.submit(new RuntimeStatsSubmitter(map));
+  }
+
+  class RuntimeStatsSubmitter implements Runnable {
+
+    private Map<OpTreeSignature, OperatorStats> map;
+
+    public RuntimeStatsSubmitter(Map<OpTreeSignature, OperatorStats> map) {
+      this.map = map;
+    }
+
+    @Override
+    public void run() {
+      try {
+        RuntimeStat rec = encode(map);
+        Hive.get().getMSC().addRuntimeStat(rec);
+      } catch (TException | HiveException | IOException e) {
+        logException("Exception while persisting runtime stat", e);
+      }
+    }
+  }
+
+  private RuntimeStat encode(Map<OpTreeSignature, OperatorStats> map) throws IOException {
+    String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
+    RuntimeStat rs = new RuntimeStat();
+    rs.setWeight(map.size());
+    rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
+    return rs;
+  }
+
+  private Map<OpTreeSignature, OperatorStats> decode(RuntimeStat rs) throws IOException {
+    RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
+    return rsm.toMap();
+  }
+
+  public void destroy() {
+    executor.shutdown();
+  }
+
+  static void logException(String msg, Exception e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(msg, e);
+    } else {
+      LOG.info(msg + ": " + e.getMessage());
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
index a372804..e932304 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hive.ql.plan.mapper;
 
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 /**
  * Enables to connect related objects to eachother.
@@ -43,7 +46,106 @@ import com.google.common.annotations.VisibleForTesting;
 public class PlanMapper {
 
   Set<EquivGroup> groups = new HashSet<>();
-  private Map<Object, EquivGroup> objectMap = new HashMap<>();
+  private Map<Object, EquivGroup> objectMap = new CompositeMap<>(OpTreeSignature.class);
+
+  /**
+   * Specialized class which can compare by identity or value; based on the key type.
+   */
+  private static class CompositeMap<K, V> implements Map<K, V> {
+
+    Map<K, V> comparedMap = new HashMap<>();
+    Map<K, V> identityMap = new IdentityHashMap<>();
+    final Set<Class<?>> typeCompared;
+
+    CompositeMap(Class<?>... comparedTypes) {
+      for (Class<?> class1 : comparedTypes) {
+        if (!Modifier.isFinal(class1.getModifiers())) {
+          throw new RuntimeException(class1 + " is not final...for this to reliably work; it should be");
+        }
+      }
+      typeCompared = Sets.newHashSet(comparedTypes);
+    }
+
+    @Override
+    public int size() {
+      return comparedMap.size() + identityMap.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return comparedMap.isEmpty() && identityMap.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return comparedMap.containsKey(key) || identityMap.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+      return comparedMap.containsValue(value) || identityMap.containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+      V v0 = comparedMap.get(key);
+      if (v0 != null) {
+        return v0;
+      }
+      return identityMap.get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+      if (shouldCompare(key.getClass())) {
+        return comparedMap.put(key, value);
+      } else {
+        return identityMap.put(key, value);
+      }
+    }
+
+    @Override
+    public V remove(Object key) {
+      if (shouldCompare(key.getClass())) {
+        return comparedMap.remove(key);
+      } else {
+        return identityMap.remove(key);
+      }
+    }
+
+    private boolean shouldCompare(Class<?> key) {
+      return typeCompared.contains(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+      for (Entry<? extends K, ? extends V> e : m.entrySet()) {
+        put(e.getKey(), e.getValue());
+      }
+    }
+
+    @Override
+    public void clear() {
+      comparedMap.clear();
+      identityMap.clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+      return Sets.union(comparedMap.keySet(), identityMap.keySet());
+    }
+
+    @Override
+    public Collection<V> values() {
+      throw new UnsupportedOperationException("This method is not supported");
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+      return Sets.union(comparedMap.entrySet(), identityMap.entrySet());
+    }
+
+  }
 
   /**
    * A set of objects which are representing the same thing.
@@ -55,7 +157,7 @@ public class PlanMapper {
    *   there might be more than one, since an optimization may replace an operator with a new one
    *   <li> Signature - to enable inter-plan look up of the same data
    *   <li> OperatorStats - collected runtime information
-   * <ul>
+   * </ul>
    */
   public class EquivGroup {
     Set<Object> members = new HashSet<>();
@@ -116,7 +218,7 @@ public class PlanMapper {
 
   private Object getKeyFor(Object o) {
     if (o instanceof Operator) {
-      Operator operator = (Operator) o;
+      Operator<?> operator = (Operator<?>) o;
       return signatureCache.getSignature(operator);
     }
     return o;

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
deleted file mode 100644
index 3d6c257..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.plan.mapper;
-
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
-import org.apache.hadoop.hive.ql.stats.OperatorStats;
-
-public class SimpleRuntimeStatsSource implements StatsSource {
-
-  private final PlanMapper pm;
-
-
-  public SimpleRuntimeStatsSource(PlanMapper pm) {
-    this.pm = pm;
-  }
-
-  @Override
-  public Optional<OperatorStats> lookup(OpTreeSignature sig) {
-    try {
-      List<OperatorStats> v = pm.lookupAll(OperatorStats.class, sig);
-      if (v.size() > 0) {
-        return Optional.of(v.get(0));
-      }
-      return Optional.empty();
-    } catch (NoSuchElementException | IllegalArgumentException iae) {
-      return Optional.empty();
-    }
-  }
-
-  @Override
-  public boolean canProvideStatsFor(Class<?> class1) {
-    if (Operator.class.isAssignableFrom(class1)) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public void putAll(Map<OpTreeSignature, OperatorStats> map) {
-    throw new RuntimeException();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
index a4e33c3..30b6a30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
@@ -18,14 +18,12 @@
 
 package org.apache.hadoop.hive.ql.plan.mapper;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
@@ -33,53 +31,50 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 public class StatsSources {
 
-  public static class MapBackedStatsSource implements StatsSource {
-
-    private Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+  private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
 
-    @Override
-    public boolean canProvideStatsFor(Class<?> clazz) {
-      if (Operator.class.isAssignableFrom(clazz)) {
-        return true;
-      }
-      return false;
-    }
+  static enum StatsSourceMode {
+    query, hiveserver, metastore;
+  }
 
-    @Override
-    public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
-      return Optional.ofNullable(map.get(treeSig));
-    }
+  public static void initialize(HiveConf hiveConf) {
+    // requesting for the stats source will implicitly initialize it
+    getStatsSource(hiveConf);
+  }
 
-    @Override
-    public void putAll(Map<OpTreeSignature, OperatorStats> map) {
-      map.putAll(map);
+  public static StatsSource getStatsSource(HiveConf conf) {
+    String mode = conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE);
+    int cacheSize = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
+    switch (mode) {
+    case "query":
+      return new MapBackedStatsSource();
+    case "hiveserver":
+      return StatsSources.globalStatsSource(cacheSize);
+    case "metastore":
+      return StatsSources.metastoreBackedStatsSource(StatsSources.globalStatsSource(cacheSize));
+    default:
+      throw new RuntimeException("Unknown StatsSource setting: " + mode);
     }
-
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
-
   public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) {
-    if (currentStatsSource instanceof CachingStatsSource) {
-      CachingStatsSource sessionStatsSource = (CachingStatsSource) currentStatsSource;
-      loadFromPlanMapper(sessionStatsSource, pm);
-      return sessionStatsSource;
-    } else {
-      return new SimpleRuntimeStatsSource(pm);
+    StatsSource statsSource = currentStatsSource;
+    if (currentStatsSource  == EmptyStatsSource.INSTANCE) {
+      statsSource = new MapBackedStatsSource();
     }
-  }
 
-  public static void loadFromPlanMapper(CachingStatsSource sessionStatsSource, PlanMapper pm) {
-    Map<OpTreeSignature, OperatorStats> map = extractStatMapFromPlanMapper(pm);
-    sessionStatsSource.putAll(map);
+    Map<OpTreeSignature, OperatorStats> statMap = extractStatMapFromPlanMapper(pm);
+    statsSource.putAll(statMap);
+    return statsSource;
   }
 
-
   private static Map<OpTreeSignature, OperatorStats> extractStatMapFromPlanMapper(PlanMapper pm) {
-    Map<OpTreeSignature, OperatorStats> map = new HashMap<OpTreeSignature, OperatorStats>();
+    Builder<OpTreeSignature, OperatorStats> map = ImmutableMap.builder();
     Iterator<EquivGroup> it = pm.iterateGroups();
     while (it.hasNext()) {
       EquivGroup e = it.next();
@@ -103,20 +98,33 @@ public class StatsSources {
         map.put(sig.get(0), stat.get(0));
       }
     }
-    return map;
+    return map.build();
   }
 
   private static StatsSource globalStatsSource;
+  private static MetastoreStatsConnector metastoreStatsConnector;
 
-  public static StatsSource globalStatsSource(HiveConf conf) {
+  public static StatsSource globalStatsSource(int cacheSize) {
     if (globalStatsSource == null) {
-      globalStatsSource = new CachingStatsSource(conf);
+      globalStatsSource = new CachingStatsSource(cacheSize);
     }
     return globalStatsSource;
   }
 
+  public static StatsSource metastoreBackedStatsSource(StatsSource parent) {
+    if (metastoreStatsConnector == null) {
+      metastoreStatsConnector = new MetastoreStatsConnector(parent);
+    }
+    return metastoreStatsConnector;
+  }
+
   @VisibleForTesting
-  public static void clearAllStats() {
+  public static void clearGlobalStats() {
+    if (metastoreStatsConnector != null) {
+      metastoreStatsConnector.destroy();
+    }
     globalStatsSource = null;
+    metastoreStatsConnector = null;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
index 409cc73..8dc7387 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -84,22 +84,7 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
     alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
     statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
 
-    coreDriver.setStatsSource(getStatsSource(driver.getConf()));
-  }
-
-  static enum StatsSourceMode {
-    query, hiveserver;
-  }
-
-  private StatsSource getStatsSource(HiveConf conf) {
-    StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE));
-    switch (mode) {
-    case query:
-      return new StatsSources.MapBackedStatsSource();
-    case hiveserver:
-      return StatsSources.globalStatsSource(conf);
-    }
-    throw new RuntimeException("Unknown StatsSource setting: " + mode);
+    coreDriver.setStatsSource(StatsSources.getStatsSource(driver.getConf()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
index 52e18a8..d70bb82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
@@ -6,7 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,10 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.stats;
 
-public class OperatorStats {
-  private final String operatorId;
+import com.google.common.base.Objects;
+
+/**
+ * Holds information an operator's statistics.
+ */
+public final class OperatorStats {
+  private String operatorId;
   private long outputRecords;
 
+  // for jackson
+  @SuppressWarnings("unused")
+  private OperatorStats() {
+  }
+
   public OperatorStats(final String opId) {
     this.operatorId = opId;
     this.outputRecords = -1;
@@ -40,4 +52,19 @@ public class OperatorStats {
   public String toString() {
     return String.format("OperatorStats %s records: %d", operatorId, outputRecords);
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(operatorId, outputRecords);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj==null || obj.getClass()!= OperatorStats.class){
+      return false;
+    }
+    OperatorStats o = (OperatorStats) obj;
+    return Objects.equal(operatorId, o.operatorId) &&
+        Objects.equal(outputRecords, o.outputRecords);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
new file mode 100644
index 0000000..627c2d8
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestRuntimeStatsPersistence {
+
+  GenericUDF udf = new GenericUDFConcat();
+
+  CompilationOpContext cCtx = new CompilationOpContext();
+
+  private Operator<?> getFilTsOp(int i, int j) {
+    Operator<TableScanDesc> ts = getTsOp(i);
+    Operator<? extends OperatorDesc> fil = getFilterOp(j);
+
+    connectOperators(ts, fil);
+
+    return fil;
+  }
+
+  private void connectOperators(Operator<?> parent, Operator<?> child) {
+    parent.getChildOperators().add(child);
+    child.getParentOperators().add(parent);
+  }
+
+  @Test
+  public void checkPersistJoinCondDesc() throws Exception {
+    JoinCondDesc jcd = new JoinCondDesc(1, 2, 3);
+    JoinCondDesc jcd2 = persistenceLoop(jcd, JoinCondDesc.class);
+    assertEquals(jcd, jcd2);
+  }
+
+  OpTreeSignatureFactory signatureFactory = OpTreeSignatureFactory.newCache();
+
+  @Test
+  public void checkPersistingSigWorks() throws Exception {
+    OpSignature sig = OpSignature.of(getTsOp(3));
+    OpSignature sig2 = persistenceLoop(sig, OpSignature.class);
+    assertEquals(sig, sig2);
+  }
+
+  @Test
+  public void checkPersistingTreeSigWorks() throws Exception {
+    OpTreeSignature sig = signatureFactory.getSignature(getFilTsOp(3, 4));
+    OpTreeSignature sig2 = persistenceLoop(sig, OpTreeSignature.class);
+    assertEquals(sig, sig2);
+  }
+
+  @Test
+  public void checkCanStoreAsGraph() throws Exception {
+
+    Operator<?> ts = getTsOp(0);
+    Operator<?> fil1 = getFilterOp(1);
+    Operator<?> fil2 = getFilterOp(2);
+    Operator<?> fil3 = getFilterOp(3);
+
+    connectOperators(ts, fil1);
+    connectOperators(ts, fil2);
+    connectOperators(fil1, fil3);
+    connectOperators(fil2, fil3);
+
+    OpTreeSignature sig = signatureFactory.getSignature(fil3);
+    OpTreeSignature sig2 = persistenceLoop(sig, OpTreeSignature.class);
+
+    assertEquals(sig, sig2);
+
+    OpTreeSignature o0 = sig.getParentSig().get(0).getParentSig().get(0);
+    OpTreeSignature o1 = sig.getParentSig().get(1).getParentSig().get(0);
+    assertTrue("these have to be the same instance", o0 == o1);
+
+    OpTreeSignature p0 = sig2.getParentSig().get(0).getParentSig().get(0);
+    OpTreeSignature p1 = sig2.getParentSig().get(1).getParentSig().get(0);
+
+    assertTrue("these have to be the same instance", p0 == p1);
+
+    assertEquals(p0, p1);
+
+  }
+
+  @Test
+  public void checkCanStoreMap() throws Exception {
+
+    Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+    map.put(signatureFactory.getSignature(getTsOp(0)), new OperatorStats("ts0"));
+    map.put(signatureFactory.getSignature(getTsOp(1)), new OperatorStats("ts1"));
+
+    RuntimeStatsMap rsm = new RuntimeStatsMap(map);
+
+    RuntimeStatsMap rsm2 = persistenceLoop(rsm, RuntimeStatsMap.class);
+    OpTreeSignature k1 = rsm.toMap().keySet().iterator().next();
+    OpTreeSignature k2 = rsm2.toMap().keySet().iterator().next();
+    assertEquals(k1, k2);
+    assertEquals(rsm, rsm2);
+  }
+
+  private <T> T persistenceLoop(T sig, Class<T> clazz) throws IOException {
+    RuntimeStatsPersister sp = RuntimeStatsPersister.INSTANCE;
+    String stored = sp.encode(sig);
+    System.out.println(stored);
+    T sig2 = sp.decode(stored, clazz);
+    return sig2;
+  }
+
+  private Operator<? extends OperatorDesc> getFilterOp(int constVal) {
+    ExprNodeDesc pred = new ExprNodeConstantDesc(constVal);
+    FilterDesc fd = new FilterDesc(pred, true);
+    Operator<? extends OperatorDesc> op = OperatorFactory.get(cCtx, fd);
+    return op;
+  }
+
+  private Operator<TableScanDesc> getTsOp(int i) {
+    Table tblMetadata = new Table("db", "table");
+    TableScanDesc desc = new TableScanDesc("alias"/*+ cCtx.nextOperatorId()*/, tblMetadata);
+    List<ExprNodeDesc> as =
+        Lists.newArrayList(new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, Integer.valueOf(i)),
+            new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "c1", "aa", false));
+    ExprNodeGenericFuncDesc f1 = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, udf, as);
+    desc.setFilterExpr(f1);
+    Operator<TableScanDesc> ts = OperatorFactory.get(cCtx, desc);
+    return ts;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
index 8126970..e8a7a1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
@@ -33,10 +33,11 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
 import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.plan.mapper.EmptyStatsSource;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
-import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
+import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
 import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
@@ -129,7 +130,7 @@ public class TestCounterMapping {
     FilterOperator filter1 = filters1.get(0);
 
     driver = createDriver();
-    ((ReExecDriver) driver).setStatsSource(new SimpleRuntimeStatsSource(pm1));
+    ((ReExecDriver) driver).setStatsSource(StatsSources.getStatsSourceContaining(EmptyStatsSource.INSTANCE, pm1));
 
     PlanMapper pm2 = getMapperForQuery(driver, query);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
index b726300..8bec56f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
@@ -27,15 +27,18 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
 import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
 import org.apache.hive.testutils.HiveTestEnvSetup;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -55,7 +58,7 @@ public class TestReOptimization {
   public static void beforeClass() throws Exception {
     IDriver driver = createDriver("");
     dropTables(driver);
-    String cmds[] = {
+    String[] cmds = {
         // @formatter:off
         "create table tu(id_uv int,id_uw int,u int)",
         "create table tv(id_uv int,v int)",
@@ -78,8 +81,13 @@ public class TestReOptimization {
     dropTables(driver);
   }
 
+  @After
+  public void after() {
+    StatsSources.clearGlobalStats();
+  }
+
   public static void dropTables(IDriver driver) throws Exception {
-    String tables[] = { "tu", "tv", "tw" };
+    String[] tables = new String[] {"tu", "tv", "tw" };
     for (String t : tables) {
       int ret = driver.run("drop table if exists " + t).getResponseCode();
       assertEquals("Checking command success", 0, ret);
@@ -98,7 +106,9 @@ public class TestReOptimization {
   @Test
   public void testStatsAreSetInReopt() throws Exception {
     IDriver driver = createDriver("overlay,reoptimize");
-    String query = "select assert_true_oom(${hiveconf:zzz} > sum(u*v)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
+    String query = "select assert_true_oom(${hiveconf:zzz} > sum(u*v))"
+        + " from tu join tv on (tu.id_uv=tv.id_uv)"
+        + " where u<10 and v>1";
 
     PlanMapper pm = getMapperForQuery(driver, query);
     Iterator<EquivGroup> itG = pm.iterateGroups();
@@ -133,7 +143,7 @@ public class TestReOptimization {
     IDriver driver = createDriver("overlay,reoptimize");
     String query =
         "select assert_true_oom(${hiveconf:zzz}>sum(1)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
-    PlanMapper pm = getMapperForQuery(driver, query);
+    getMapperForQuery(driver, query);
 
   }
 
@@ -143,8 +153,72 @@ public class TestReOptimization {
     String query =
         "select assert_true(${hiveconf:zzz}>sum(1)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
 
+    getMapperForQuery(driver, query);
+    assertEquals(1, driver.getContext().getExecutionIndex());
+  }
+
+  @Test
+  public void testStatCachingQuery() throws Exception {
+    HiveConf conf = env_setup.getTestCtx().hiveConf;
+    conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "query");
+    conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+    checkRuntimeStatsReuse(false, false, false);
+  }
+
+  @Test
+  public void testStatCachingHS2() throws Exception {
+    HiveConf conf = env_setup.getTestCtx().hiveConf;
+    conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "hiveserver");
+    conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+    checkRuntimeStatsReuse(true, true, false);
+  }
+
+  @Test
+  public void testStatCachingMetaStore() throws Exception {
+    HiveConf conf = env_setup.getTestCtx().hiveConf;
+    conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "metastore");
+    conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+    checkRuntimeStatsReuse(true, true, true);
+  }
+
+  private void checkRuntimeStatsReuse(
+      boolean expectInSameSession,
+      boolean expectNewHs2Session,
+      boolean expectHs2Instance) throws CommandProcessorResponse {
+    {
+      // same session
+      IDriver driver = createDriver("reoptimize");
+      checkUsageOfRuntimeStats(driver, false);
+      driver = DriverFactory.newDriver(env_setup.getTestCtx().hiveConf);
+      checkUsageOfRuntimeStats(driver, expectInSameSession);
+    }
+    {
+      // new session
+      IDriver driver = createDriver("reoptimize");
+      checkUsageOfRuntimeStats(driver, expectNewHs2Session);
+    }
+    StatsSources.clearGlobalStats();
+    {
+      // new hs2 instance session
+      IDriver driver = createDriver("reoptimize");
+      checkUsageOfRuntimeStats(driver, expectHs2Instance);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void checkUsageOfRuntimeStats(IDriver driver, boolean expected) throws CommandProcessorResponse {
+    String query = "select sum(u) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
     PlanMapper pm = getMapperForQuery(driver, query);
     assertEquals(1, driver.getContext().getExecutionIndex());
+    List<CommonJoinOperator> allJoin = pm.getAll(CommonJoinOperator.class);
+    CommonJoinOperator join = allJoin.iterator().next();
+    Statistics joinStat = join.getStatistics();
+
+    assertEquals("expectation of the usage of runtime stats doesn't match", expected,
+        joinStat.isRuntimeStats());
   }
 
   @Test
@@ -152,7 +226,7 @@ public class TestReOptimization {
 
     IDriver driver = createDriver("overlay,reoptimize");
     String query = "explain reoptimization select 1 from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
-    PlanMapper pm = getMapperForQuery(driver, query);
+    getMapperForQuery(driver, query);
     List<String> res = new ArrayList<>();
     List<String> res1 = new ArrayList<>();
     while (driver.getResults(res1)) {
@@ -165,6 +239,7 @@ public class TestReOptimization {
 
   }
 
+
   private static IDriver createDriver(String strategies) {
     HiveConf conf = env_setup.getTestCtx().hiveConf;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/56c3a957/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 1642357..e373628 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
 import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
 import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
@@ -247,6 +248,8 @@ public class HiveServer2 extends CompositeService {
     // Create views registry
     HiveMaterializedViewsRegistry.get().init();
 
+    StatsSources.initialize(hiveConf);
+
     // Setup cache if enabled.
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
       try {