You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2018/04/26 14:58:36 UTC
[12/50] [abbrv] hive git commit: HIVE-19171 : Persist runtime
statistics in metastore (Zoltan Haindrich via Ashutosh Chauhan)
HIVE-19171 : Persist runtime statistics in metastore (Zoltan Haindrich via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3e2d8a0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3e2d8a0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3e2d8a0
Branch: refs/heads/storage-branch-2.6
Commit: b3e2d8a05f57a91b12b8347b2763a296c3480d97
Parents: 4f67beb
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Mon Apr 23 13:36:11 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Mon Apr 23 13:36:11 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../listener/DummyRawStoreFailEvent.java | 19 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +-
.../upgrade/derby/056-HIVE-19171.derby.sql | 10 +
.../ql/optimizer/signature/OpSignature.java | 19 +-
.../ql/optimizer/signature/OpTreeSignature.java | 24 +-
.../signature/OpTreeSignatureFactory.java | 12 +-
.../ql/optimizer/signature/RuntimeStatsMap.java | 83 +
.../signature/RuntimeStatsPersister.java | 54 +
.../ql/optimizer/signature/SignatureUtils.java | 22 +-
.../hadoop/hive/ql/plan/FileSinkDesc.java | 7 +-
.../hadoop/hive/ql/plan/HashTableSinkDesc.java | 6 +-
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 6 +-
.../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 6 +-
.../hive/ql/plan/mapper/CachingStatsSource.java | 7 +-
.../hive/ql/plan/mapper/EmptyStatsSource.java | 2 +-
.../ql/plan/mapper/MetastoreStatsConnector.java | 143 +
.../hadoop/hive/ql/plan/mapper/PlanMapper.java | 108 +-
.../plan/mapper/SimpleRuntimeStatsSource.java | 37 +-
.../hive/ql/plan/mapper/StatsSources.java | 86 +-
.../hadoop/hive/ql/reexec/ReOptimizePlugin.java | 17 +-
.../hadoop/hive/ql/stats/OperatorStats.java | 33 +-
.../signature/TestRuntimeStatsPersistence.java | 165 +
.../ql/plan/mapping/TestCounterMapping.java | 7 +-
.../ql/plan/mapping/TestReOptimization.java | 85 +-
.../apache/hive/service/server/HiveServer2.java | 3 +
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 5376 ++++++++++--------
.../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 259 +
.../ThriftHiveMetastore_server.skeleton.cpp | 10 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 376 +-
.../gen/thrift/gen-cpp/hive_metastore_types.h | 97 +
.../metastore/api/GetRuntimeStatsRequest.java | 283 +
.../hadoop/hive/metastore/api/RuntimeStat.java | 600 ++
.../hive/metastore/api/ThriftHiveMetastore.java | 2584 +++++++--
.../gen-php/metastore/ThriftHiveMetastore.php | 481 ++
.../src/gen/thrift/gen-php/metastore/Types.php | 171 +
.../hive_metastore/ThriftHiveMetastore-remote | 14 +
.../hive_metastore/ThriftHiveMetastore.py | 409 ++
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 141 +
.../gen/thrift/gen-rb/hive_metastore_types.rb | 37 +
.../gen/thrift/gen-rb/thrift_hive_metastore.rb | 119 +
.../hadoop/hive/metastore/HiveMetaStore.java | 113 +-
.../hive/metastore/HiveMetaStoreClient.java | 140 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 9 +
.../hadoop/hive/metastore/ObjectStore.java | 248 +-
.../apache/hadoop/hive/metastore/RawStore.java | 15 +-
.../hive/metastore/RuntimeStatsCleanerTask.java | 67 +
.../hive/metastore/cache/CachedStore.java | 21 +-
.../hive/metastore/conf/MetastoreConf.java | 34 +-
.../hive/metastore/model/MRuntimeStat.java | 59 +
.../src/main/resources/package.jdo | 14 +
.../main/sql/derby/hive-schema-3.0.0.derby.sql | 10 +
.../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql | 10 +
.../main/sql/mssql/hive-schema-3.0.0.mssql.sql | 9 +
.../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql | 9 +
.../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 10 +
.../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 9 +
.../sql/oracle/hive-schema-3.0.0.oracle.sql | 10 +
.../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 9 +
.../sql/postgres/hive-schema-3.0.0.postgres.sql | 11 +
.../upgrade-2.3.0-to-3.0.0.postgres.sql | 9 +
.../src/main/thrift/hive_metastore.thrift | 12 +
.../DummyRawStoreControlledCommit.java | 18 +
.../DummyRawStoreForJdoConnection.java | 17 +
.../HiveMetaStoreClientPreCatalog.java | 10 +
.../hive/metastore/client/TestRuntimeStats.java | 100 +
66 files changed, 9936 insertions(+), 2963 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2403d7a..f40c606 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4270,15 +4270,15 @@ public class HiveConf extends Configuration {
new StringSet("query", "hiveserver", "metastore"),
"Sets the persistence scope of runtime statistics\n"
+ " query: runtime statistics are only used during re-execution\n"
- + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"),
+ + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it\n"
+ + " metastore: runtime statistics are persisted in the metastore as well"),
HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1,
"Maximum number of re-executions for a single query."),
HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false,
"If sessionstats are enabled; this option can be used to collect statistics all the time"),
HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000,
- "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"),
-
+ "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100. See also: runtime.stats.max.entries"),
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
"If the query results cache is enabled. This will keep results of previously executed queries " +
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 801de7a..8ecbaad 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
@@ -295,6 +296,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
objectStore.updateCreationMetadata(catName, dbname, tablename, cm);
}
+ @Override
public void alterTable(String catName, String dbName, String name, Table newTable)
throws InvalidObjectException, MetaException {
if (shouldEventSucceed) {
@@ -1126,6 +1128,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
return null;
}
+ @Override
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
NoSuchObjectException {
objectStore.createISchema(schema);
@@ -1195,4 +1198,20 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
objectStore.addSerde(serde);
}
+
+ @Override
+ public void addRuntimeStat(RuntimeStat stat) throws MetaException {
+ objectStore.addRuntimeStat(stat);
+ }
+
+ @Override
+ public List<RuntimeStat> getRuntimeStats() throws MetaException {
+ return objectStore.getRuntimeStats();
+ }
+
+ @Override
+ public int deleteRuntimeStats(int maxRetained, int maxRetainSecs) {
+ return 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 88022be..750fc69 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1067,7 +1067,7 @@ public class QTestUtil {
clearTablesCreatedDuringTests();
clearUDFsCreatedDuringTests();
clearKeysCreatedInTests();
- StatsSources.clearAllStats();
+ StatsSources.clearGlobalStats();
}
protected void clearSettingsCreatedInTests() throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql b/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
new file mode 100644
index 0000000..ef6c77b
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/056-HIVE-19171.derby.sql
@@ -0,0 +1,10 @@
+
+
+CREATE TABLE "APP"."RUNTIME_STATS" (
+ "RS_ID" bigint primary key,
+ "CREATE_TIME" integer not null,
+ "WEIGHT" integer not null,
+ "PAYLOAD" BLOB
+);
+
+CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
index e87bbce..f626bd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java
@@ -25,18 +25,28 @@ import java.util.Map.Entry;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import com.google.common.annotations.VisibleForTesting;
/**
* Signature of the operator(non-recursive).
*/
-public class OpSignature {
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
+public final class OpSignature {
/**
* Holds the signature of the operator; the keys are are the methods name marked by {@link Signature}.
*/
+ @JsonProperty
private Map<String, Object> sigMap;
+ // need this for Jackson to work
+ @SuppressWarnings("unused")
+ private OpSignature() {
+ }
+
private OpSignature(Operator<? extends OperatorDesc> op) {
sigMap = new HashMap<>();
// FIXME: consider other operator info as well..not just conf?
@@ -70,7 +80,7 @@ public class OpSignature {
@VisibleForTesting
public void proveEquals(OpSignature other) {
- proveEquals(sigMap,other.sigMap);
+ proveEquals(sigMap, other.sigMap);
}
private static void proveEquals(Map<String, Object> m1, Map<String, Object> m2) {
@@ -103,4 +113,9 @@ public class OpSignature {
}
return sb.toString();
}
+
+ public Map<String, Object> getSigMap() {
+ return sigMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
index c3dc848..f774158 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java
@@ -24,14 +24,28 @@ import java.util.Objects;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
+
/**
* Operator tree signature.
*/
-public class OpTreeSignature {
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
+public final class OpTreeSignature {
+
+ @JsonProperty
private int hashCode;
+ @JsonProperty
private OpSignature sig;
+ @JsonProperty
private ArrayList<OpTreeSignature> parentSig;
+ // need this for Jackson to work
+ @SuppressWarnings("unused")
+ private OpTreeSignature() {
+ }
+
OpTreeSignature(Operator<?> op, OpTreeSignatureFactory osf) {
sig = OpSignature.of(op);
parentSig = new ArrayList<>();
@@ -82,4 +96,12 @@ public class OpTreeSignature {
return sb.toString();
}
+ public OpSignature getSig() {
+ return sig;
+ }
+
+ public ArrayList<OpTreeSignature> getParentSig() {
+ return parentSig;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
index 3df5ee9..80a3edf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java
@@ -29,22 +29,22 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
*/
public interface OpTreeSignatureFactory {
- public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op);
+ OpTreeSignature getSignature(Operator<? extends OperatorDesc> op);
- static final OpTreeSignatureFactory DIRECT = new Direct();
+ OpTreeSignatureFactory DIRECT = new Direct();
- public static OpTreeSignatureFactory direct() {
+ static OpTreeSignatureFactory direct() {
return DIRECT;
}
- public static OpTreeSignatureFactory newCache() {
+ static OpTreeSignatureFactory newCache() {
return new CachedFactory();
}
// FIXME: possible alternative: move both OpSignature/OpTreeSignature into
// under some class as nested ones; and that way this factory level caching can be made "transparent"
- static class Direct implements OpTreeSignatureFactory {
+ class Direct implements OpTreeSignatureFactory {
@Override
public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op) {
@@ -53,7 +53,7 @@ public interface OpTreeSignatureFactory {
}
- static class CachedFactory implements OpTreeSignatureFactory {
+ class CachedFactory implements OpTreeSignatureFactory {
Map<Operator<? extends OperatorDesc>, OpTreeSignature> cache = new IdentityHashMap<>();
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
new file mode 100644
index 0000000..195a8b1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+
+/**
+ * This class makes it easier for jackson to comprehend the map type
+ *
+ * Instead of getting into convincing Jackson to store the map with serializers and typefactory tricks;
+ * this class is a simple "repacker" to and from list.
+ */
+public final class RuntimeStatsMap {
+ @JsonProperty
+ private List<OpTreeSignature> sigs;
+ @JsonProperty
+ private List<OperatorStats> ss;
+
+ RuntimeStatsMap() {
+ }
+
+
+ public RuntimeStatsMap(Map<OpTreeSignature, OperatorStats> input) {
+ sigs = new ArrayList<>(input.size());
+ ss = new ArrayList<>(input.size());
+ for (Entry<OpTreeSignature, OperatorStats> ent : input.entrySet()) {
+ sigs.add(ent.getKey());
+ ss.add(ent.getValue());
+ }
+ }
+
+ public Map<OpTreeSignature, OperatorStats> toMap() throws IOException {
+ if (sigs.size() != ss.size()) {
+ throw new IOException("constraint validation");
+ }
+ Map<OpTreeSignature, OperatorStats> ret = new HashMap<>();
+ for (int i = 0; i < sigs.size(); i++) {
+ ret.put(sigs.get(i), ss.get(i));
+ }
+ return ret;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(sigs, ss);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != RuntimeStatsMap.class) {
+ return false;
+ }
+ RuntimeStatsMap o = (RuntimeStatsMap) obj;
+ return Objects.equal(sigs, o.sigs) &&
+ Objects.equal(ss, o.ss);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
new file mode 100644
index 0000000..696fe1f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/RuntimeStatsPersister.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * Enables to encode/decode runtime statistics values into textual form.
+ */
+public class RuntimeStatsPersister {
+ public static final RuntimeStatsPersister INSTANCE = new RuntimeStatsPersister();
+
+ private final ObjectMapper om;
+
+ RuntimeStatsPersister() {
+ om = new ObjectMapper();
+ om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+ om.configure(SerializationFeature.INDENT_OUTPUT, true);
+ om.configure(MapperFeature.REQUIRE_SETTERS_FOR_GETTERS, true);
+ }
+
+ public <T> String encode(T input) throws IOException {
+ return om.writeValueAsString(input);
+ }
+
+ public <T> T decode(String input, Class<T> clazz) throws IOException {
+ return om.readValue(input, clazz);
+ }
+
+ public <T> T decode(byte[] input, Class<T> clazz) throws IOException {
+ return om.readValue(input, clazz);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
index 4f3e338..f599d33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java
@@ -23,17 +23,15 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
/**
* Enables to calculate the signature of an object.
*
* If the object has methods annotated with {@link Signature}, they will be used.
- * If the object has no methods marked with the annotation; the object itself is used in the signature to prevent incorrect matches.
+ * If the object has no methods marked with the annotation;
+ * the object itself is used in the signature to prevent incorrect matches.
*/
-public class SignatureUtils {
+public final class SignatureUtils {
private static Map<Class<?>, SignatureMapper> mappers = new HashMap<>();
@@ -42,28 +40,24 @@ public class SignatureUtils {
mapper.write(ret, o);
}
- static class SignatureMapper {
+ /** Prevent construction. */
+ private SignatureUtils() {
+ }
- static final Set<String> acceptedSignatureTypes = Sets.newHashSet();
+ static class SignatureMapper {
private List<Method> sigMethods;
private String classLabel;
- public SignatureMapper(Class<?> o) {
+ SignatureMapper(Class<?> o) {
Method[] f = o.getMethods();
sigMethods = new ArrayList<>();
for (Method method : f) {
if (method.isAnnotationPresent(Signature.class)) {
- Class<?> rType = method.getReturnType();
- String rTypeName = rType.getName();
- if (!rType.isPrimitive() && acceptedSignatureTypes.contains(rTypeName)) {
- throw new RuntimeException("unxepected type (" + rTypeName + ") used in signature");
- }
sigMethods.add(method);
}
}
-
classLabel = o.getName();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index e15a49f..fcb6de7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -192,11 +192,15 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
- @Signature
public Path getDirName() {
return dirName;
}
+ @Signature
+ public String getDirNameString() {
+ return dirName.toString();
+ }
+
public void setDirName(final Path dirName) {
this.dirName = dirName;
}
@@ -216,7 +220,6 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
}
@Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
- @Signature
public TableDesc getTableInfo() {
return tableInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
index a61a47e..d71ba5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
@@ -308,10 +308,10 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable {
*/
@Override
@Explain(displayName = "keys")
- public Map<Byte, String> getKeysString() {
- Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+ public Map<String, String> getKeysString() {
+ Map<String, String> keyMap = new LinkedHashMap<>();
for (Map.Entry<Byte, List<ExprNodeDesc>> k: getKeys().entrySet()) {
- keyMap.put(k.getKey(), PlanUtils.getExprListString(k.getValue()));
+ keyMap.put(String.valueOf(k.getKey()), PlanUtils.getExprListString(k.getValue()));
}
return keyMap;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index e7ca7f6..95990b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -231,14 +231,14 @@ public class JoinDesc extends AbstractOperatorDesc {
*/
@Explain(displayName = "keys")
@Signature
- public Map<Byte, String> getKeysString() {
+ public Map<String, String> getKeysString() {
if (joinKeys == null) {
return null;
}
- Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+ Map<String, String> keyMap = new LinkedHashMap<String, String>();
for (byte i = 0; i < joinKeys.length; i++) {
- keyMap.put(i, PlanUtils.getExprListString(Arrays.asList(joinKeys[i])));
+ keyMap.put(String.valueOf(i), PlanUtils.getExprListString(Arrays.asList(joinKeys[i])));
}
return keyMap;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index 54b705d..dc4f085 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -213,10 +213,10 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
*/
@Override
@Explain(displayName = "keys")
- public Map<Byte, String> getKeysString() {
- Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
+ public Map<String, String> getKeysString() {
+ Map<String, String> keyMap = new LinkedHashMap<>();
for (Map.Entry<Byte, List<ExprNodeDesc>> k: getKeys().entrySet()) {
- keyMap.put(k.getKey(), PlanUtils.getExprListString(k.getValue()));
+ keyMap.put(String.valueOf(k.getKey()), PlanUtils.getExprListString(k.getValue()));
}
return keyMap;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
index c515276..2841638 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
@@ -22,8 +22,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
@@ -36,9 +34,8 @@ public class CachingStatsSource implements StatsSource {
private final Cache<OpTreeSignature, OperatorStats> cache;
- public CachingStatsSource(HiveConf conf) {
- int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
- cache = CacheBuilder.newBuilder().maximumSize(size).build();
+ public CachingStatsSource(int cacheSize) {
+ cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
}
public void put(OpTreeSignature sig, OperatorStats opStat) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
index 19df13a..624f107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -24,7 +24,7 @@ import java.util.Optional;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
-public class EmptyStatsSource implements StatsSource {
+public final class EmptyStatsSource implements StatsSource {
public static StatsSource INSTANCE = new EmptyStatsSource();
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
new file mode 100644
index 0000000..237c1cc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/MetastoreStatsConnector.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsMap;
+import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsPersister;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Decorates a StatSource to be loaded and persisted in the metastore as well.
+ */
+class MetastoreStatsConnector implements StatsSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatsConnector.class);
+
+ private final StatsSource ss;
+
+ private ExecutorService executor;
+
+ MetastoreStatsConnector(StatsSource ss) {
+ this.ss = ss;
+ executor = Executors.newSingleThreadExecutor(
+ new BasicThreadFactory.Builder()
+ .namingPattern("Metastore-RuntimeStats-Loader-%d")
+ .daemon(true)
+ .build());
+
+ executor.submit(new RuntimeStatsLoader());
+ }
+
+ private class RuntimeStatsLoader implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ List<RuntimeStat> rs = Hive.get().getMSC().getRuntimeStats();
+ for (RuntimeStat thriftStat : rs) {
+ try {
+ ss.putAll(decode(thriftStat));
+ } catch (IOException e) {
+ logException("Exception while loading runtime stats", e);
+ }
+ }
+ } catch (TException | HiveException e) {
+ logException("Exception while reading metastore runtime stats", e);
+ }
+ }
+ }
+
+ @Override
+ public boolean canProvideStatsFor(Class<?> clazz) {
+ return ss.canProvideStatsFor(clazz);
+ }
+
+ @Override
+ public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+ return ss.lookup(treeSig);
+ }
+
+ @Override
+ public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+ ss.putAll(map);
+ executor.submit(new RuntimeStatsSubmitter(map));
+ }
+
+ class RuntimeStatsSubmitter implements Runnable {
+
+ private Map<OpTreeSignature, OperatorStats> map;
+
+ public RuntimeStatsSubmitter(Map<OpTreeSignature, OperatorStats> map) {
+ this.map = map;
+ }
+
+ @Override
+ public void run() {
+ try {
+ RuntimeStat rec = encode(map);
+ Hive.get().getMSC().addRuntimeStat(rec);
+ } catch (TException | HiveException | IOException e) {
+ logException("Exception while persisting runtime stat", e);
+ }
+ }
+ }
+
+ private RuntimeStat encode(Map<OpTreeSignature, OperatorStats> map) throws IOException {
+ String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
+ RuntimeStat rs = new RuntimeStat();
+ rs.setWeight(map.size());
+ rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
+ return rs;
+ }
+
+ private Map<OpTreeSignature, OperatorStats> decode(RuntimeStat rs) throws IOException {
+ RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
+ return rsm.toMap();
+ }
+
+ public void destroy() {
+ executor.shutdown();
+ }
+
+ static void logException(String msg, Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ } else {
+ LOG.info(msg + ": " + e.getMessage());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
index a372804..e932304 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hive.ql.plan.mapper;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
/**
* Enables to connect related objects to eachother.
@@ -43,7 +46,106 @@ import com.google.common.annotations.VisibleForTesting;
public class PlanMapper {
Set<EquivGroup> groups = new HashSet<>();
- private Map<Object, EquivGroup> objectMap = new HashMap<>();
+ private Map<Object, EquivGroup> objectMap = new CompositeMap<>(OpTreeSignature.class);
+
+ /**
+ * Specialized class which can compare by identity or value; based on the key type.
+ */
+ private static class CompositeMap<K, V> implements Map<K, V> {
+
+ Map<K, V> comparedMap = new HashMap<>();
+ Map<K, V> identityMap = new IdentityHashMap<>();
+ final Set<Class<?>> typeCompared;
+
+ CompositeMap(Class<?>... comparedTypes) {
+ for (Class<?> class1 : comparedTypes) {
+ if (!Modifier.isFinal(class1.getModifiers())) {
+ throw new RuntimeException(class1 + " is not final...for this to reliably work; it should be");
+ }
+ }
+ typeCompared = Sets.newHashSet(comparedTypes);
+ }
+
+ @Override
+ public int size() {
+ return comparedMap.size() + identityMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return comparedMap.isEmpty() && identityMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return comparedMap.containsKey(key) || identityMap.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return comparedMap.containsValue(value) || identityMap.containsValue(value);
+ }
+
+ @Override
+ public V get(Object key) {
+ V v0 = comparedMap.get(key);
+ if (v0 != null) {
+ return v0;
+ }
+ return identityMap.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ if (shouldCompare(key.getClass())) {
+ return comparedMap.put(key, value);
+ } else {
+ return identityMap.put(key, value);
+ }
+ }
+
+ @Override
+ public V remove(Object key) {
+ if (shouldCompare(key.getClass())) {
+ return comparedMap.remove(key);
+ } else {
+ return identityMap.remove(key);
+ }
+ }
+
+ private boolean shouldCompare(Class<?> key) {
+ return typeCompared.contains(key);
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ for (Entry<? extends K, ? extends V> e : m.entrySet()) {
+ put(e.getKey(), e.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ comparedMap.clear();
+ identityMap.clear();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return Sets.union(comparedMap.keySet(), identityMap.keySet());
+ }
+
+ @Override
+ public Collection<V> values() {
+ throw new UnsupportedOperationException("This method is not supported");
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return Sets.union(comparedMap.entrySet(), identityMap.entrySet());
+ }
+
+ }
/**
* A set of objects which are representing the same thing.
@@ -55,7 +157,7 @@ public class PlanMapper {
* there might be more than one, since an optimization may replace an operator with a new one
* <li> Signature - to enable inter-plan look up of the same data
* <li> OperatorStats - collected runtime information
- * <ul>
+ * </ul>
*/
public class EquivGroup {
Set<Object> members = new HashSet<>();
@@ -116,7 +218,7 @@ public class PlanMapper {
private Object getKeyFor(Object o) {
if (o instanceof Operator) {
- Operator operator = (Operator) o;
+ Operator<?> operator = (Operator<?>) o;
return signatureCache.getSignature(operator);
}
return o;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
index 3d6c257..fb2b5f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
@@ -18,48 +18,33 @@
package org.apache.hadoop.hive.ql.plan.mapper;
-import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
-public class SimpleRuntimeStatsSource implements StatsSource {
+public class MapBackedStatsSource implements StatsSource {
- private final PlanMapper pm;
-
-
- public SimpleRuntimeStatsSource(PlanMapper pm) {
- this.pm = pm;
- }
+ private Map<OpTreeSignature, OperatorStats> map = new ConcurrentHashMap<>();
@Override
- public Optional<OperatorStats> lookup(OpTreeSignature sig) {
- try {
- List<OperatorStats> v = pm.lookupAll(OperatorStats.class, sig);
- if (v.size() > 0) {
- return Optional.of(v.get(0));
- }
- return Optional.empty();
- } catch (NoSuchElementException | IllegalArgumentException iae) {
- return Optional.empty();
+ public boolean canProvideStatsFor(Class<?> clazz) {
+ if (Operator.class.isAssignableFrom(clazz)) {
+ return true;
}
+ return false;
}
@Override
- public boolean canProvideStatsFor(Class<?> class1) {
- if (Operator.class.isAssignableFrom(class1)) {
- return true;
- }
- return false;
+ public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+ return Optional.ofNullable(map.get(treeSig));
}
@Override
public void putAll(Map<OpTreeSignature, OperatorStats> map) {
- throw new RuntimeException();
+ this.map.putAll(map);
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
index a4e33c3..30b6a30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
@@ -18,14 +18,12 @@
package org.apache.hadoop.hive.ql.plan.mapper;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
@@ -33,53 +31,50 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
public class StatsSources {
- public static class MapBackedStatsSource implements StatsSource {
-
- private Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
- @Override
- public boolean canProvideStatsFor(Class<?> clazz) {
- if (Operator.class.isAssignableFrom(clazz)) {
- return true;
- }
- return false;
- }
+ static enum StatsSourceMode {
+ query, hiveserver, metastore;
+ }
- @Override
- public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
- return Optional.ofNullable(map.get(treeSig));
- }
+ public static void initialize(HiveConf hiveConf) {
+ // requesting for the stats source will implicitly initialize it
+ getStatsSource(hiveConf);
+ }
- @Override
- public void putAll(Map<OpTreeSignature, OperatorStats> map) {
- map.putAll(map);
+ public static StatsSource getStatsSource(HiveConf conf) {
+ String mode = conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE);
+ int cacheSize = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
+ switch (mode) {
+ case "query":
+ return new MapBackedStatsSource();
+ case "hiveserver":
+ return StatsSources.globalStatsSource(cacheSize);
+ case "metastore":
+ return StatsSources.metastoreBackedStatsSource(StatsSources.globalStatsSource(cacheSize));
+ default:
+ throw new RuntimeException("Unknown StatsSource setting: " + mode);
}
-
}
- private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
-
public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) {
- if (currentStatsSource instanceof CachingStatsSource) {
- CachingStatsSource sessionStatsSource = (CachingStatsSource) currentStatsSource;
- loadFromPlanMapper(sessionStatsSource, pm);
- return sessionStatsSource;
- } else {
- return new SimpleRuntimeStatsSource(pm);
+ StatsSource statsSource = currentStatsSource;
+ if (currentStatsSource == EmptyStatsSource.INSTANCE) {
+ statsSource = new MapBackedStatsSource();
}
- }
- public static void loadFromPlanMapper(CachingStatsSource sessionStatsSource, PlanMapper pm) {
- Map<OpTreeSignature, OperatorStats> map = extractStatMapFromPlanMapper(pm);
- sessionStatsSource.putAll(map);
+ Map<OpTreeSignature, OperatorStats> statMap = extractStatMapFromPlanMapper(pm);
+ statsSource.putAll(statMap);
+ return statsSource;
}
-
private static Map<OpTreeSignature, OperatorStats> extractStatMapFromPlanMapper(PlanMapper pm) {
- Map<OpTreeSignature, OperatorStats> map = new HashMap<OpTreeSignature, OperatorStats>();
+ Builder<OpTreeSignature, OperatorStats> map = ImmutableMap.builder();
Iterator<EquivGroup> it = pm.iterateGroups();
while (it.hasNext()) {
EquivGroup e = it.next();
@@ -103,20 +98,33 @@ public class StatsSources {
map.put(sig.get(0), stat.get(0));
}
}
- return map;
+ return map.build();
}
private static StatsSource globalStatsSource;
+ private static MetastoreStatsConnector metastoreStatsConnector;
- public static StatsSource globalStatsSource(HiveConf conf) {
+ public static StatsSource globalStatsSource(int cacheSize) {
if (globalStatsSource == null) {
- globalStatsSource = new CachingStatsSource(conf);
+ globalStatsSource = new CachingStatsSource(cacheSize);
}
return globalStatsSource;
}
+ public static StatsSource metastoreBackedStatsSource(StatsSource parent) {
+ if (metastoreStatsConnector == null) {
+ metastoreStatsConnector = new MetastoreStatsConnector(parent);
+ }
+ return metastoreStatsConnector;
+ }
+
@VisibleForTesting
- public static void clearAllStats() {
+ public static void clearGlobalStats() {
+ if (metastoreStatsConnector != null) {
+ metastoreStatsConnector.destroy();
+ }
globalStatsSource = null;
+ metastoreStatsConnector = null;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
index 409cc73..8dc7387 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -84,22 +84,7 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
- coreDriver.setStatsSource(getStatsSource(driver.getConf()));
- }
-
- static enum StatsSourceMode {
- query, hiveserver;
- }
-
- private StatsSource getStatsSource(HiveConf conf) {
- StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE));
- switch (mode) {
- case query:
- return new StatsSources.MapBackedStatsSource();
- case hiveserver:
- return StatsSources.globalStatsSource(conf);
- }
- throw new RuntimeException("Unknown StatsSource setting: " + mode);
+ coreDriver.setStatsSource(StatsSources.getStatsSource(driver.getConf()));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
index 52e18a8..d70bb82 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
@@ -6,7 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,10 +17,20 @@
*/
package org.apache.hadoop.hive.ql.stats;
-public class OperatorStats {
- private final String operatorId;
+import com.google.common.base.Objects;
+
+/**
+ * Holds information an operator's statistics.
+ */
+public final class OperatorStats {
+ private String operatorId;
private long outputRecords;
+ // for jackson
+ @SuppressWarnings("unused")
+ private OperatorStats() {
+ }
+
public OperatorStats(final String opId) {
this.operatorId = opId;
this.outputRecords = -1;
@@ -40,4 +52,19 @@ public class OperatorStats {
public String toString() {
return String.format("OperatorStats %s records: %d", operatorId, outputRecords);
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(operatorId, outputRecords);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj==null || obj.getClass()!= OperatorStats.class){
+ return false;
+ }
+ OperatorStats o = (OperatorStats) obj;
+ return Objects.equal(operatorId, o.operatorId) &&
+ Objects.equal(outputRecords, o.outputRecords);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
new file mode 100644
index 0000000..627c2d8
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestRuntimeStatsPersistence.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.signature;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestRuntimeStatsPersistence {
+
+ GenericUDF udf = new GenericUDFConcat();
+
+ CompilationOpContext cCtx = new CompilationOpContext();
+
+ private Operator<?> getFilTsOp(int i, int j) {
+ Operator<TableScanDesc> ts = getTsOp(i);
+ Operator<? extends OperatorDesc> fil = getFilterOp(j);
+
+ connectOperators(ts, fil);
+
+ return fil;
+ }
+
+ private void connectOperators(Operator<?> parent, Operator<?> child) {
+ parent.getChildOperators().add(child);
+ child.getParentOperators().add(parent);
+ }
+
+ @Test
+ public void checkPersistJoinCondDesc() throws Exception {
+ JoinCondDesc jcd = new JoinCondDesc(1, 2, 3);
+ JoinCondDesc jcd2 = persistenceLoop(jcd, JoinCondDesc.class);
+ assertEquals(jcd, jcd2);
+ }
+
+ OpTreeSignatureFactory signatureFactory = OpTreeSignatureFactory.newCache();
+
+ @Test
+ public void checkPersistingSigWorks() throws Exception {
+ OpSignature sig = OpSignature.of(getTsOp(3));
+ OpSignature sig2 = persistenceLoop(sig, OpSignature.class);
+ assertEquals(sig, sig2);
+ }
+
+ @Test
+ public void checkPersistingTreeSigWorks() throws Exception {
+ OpTreeSignature sig = signatureFactory.getSignature(getFilTsOp(3, 4));
+ OpTreeSignature sig2 = persistenceLoop(sig, OpTreeSignature.class);
+ assertEquals(sig, sig2);
+ }
+
+ @Test
+ public void checkCanStoreAsGraph() throws Exception {
+
+ Operator<?> ts = getTsOp(0);
+ Operator<?> fil1 = getFilterOp(1);
+ Operator<?> fil2 = getFilterOp(2);
+ Operator<?> fil3 = getFilterOp(3);
+
+ connectOperators(ts, fil1);
+ connectOperators(ts, fil2);
+ connectOperators(fil1, fil3);
+ connectOperators(fil2, fil3);
+
+ OpTreeSignature sig = signatureFactory.getSignature(fil3);
+ OpTreeSignature sig2 = persistenceLoop(sig, OpTreeSignature.class);
+
+ assertEquals(sig, sig2);
+
+ OpTreeSignature o0 = sig.getParentSig().get(0).getParentSig().get(0);
+ OpTreeSignature o1 = sig.getParentSig().get(1).getParentSig().get(0);
+ assertTrue("these have to be the same instance", o0 == o1);
+
+ OpTreeSignature p0 = sig2.getParentSig().get(0).getParentSig().get(0);
+ OpTreeSignature p1 = sig2.getParentSig().get(1).getParentSig().get(0);
+
+ assertTrue("these have to be the same instance", p0 == p1);
+
+ assertEquals(p0, p1);
+
+ }
+
+ @Test
+ public void checkCanStoreMap() throws Exception {
+
+ Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+ map.put(signatureFactory.getSignature(getTsOp(0)), new OperatorStats("ts0"));
+ map.put(signatureFactory.getSignature(getTsOp(1)), new OperatorStats("ts1"));
+
+ RuntimeStatsMap rsm = new RuntimeStatsMap(map);
+
+ RuntimeStatsMap rsm2 = persistenceLoop(rsm, RuntimeStatsMap.class);
+ OpTreeSignature k1 = rsm.toMap().keySet().iterator().next();
+ OpTreeSignature k2 = rsm2.toMap().keySet().iterator().next();
+ assertEquals(k1, k2);
+ assertEquals(rsm, rsm2);
+ }
+
+ private <T> T persistenceLoop(T sig, Class<T> clazz) throws IOException {
+ RuntimeStatsPersister sp = RuntimeStatsPersister.INSTANCE;
+ String stored = sp.encode(sig);
+ System.out.println(stored);
+ T sig2 = sp.decode(stored, clazz);
+ return sig2;
+ }
+
+ private Operator<? extends OperatorDesc> getFilterOp(int constVal) {
+ ExprNodeDesc pred = new ExprNodeConstantDesc(constVal);
+ FilterDesc fd = new FilterDesc(pred, true);
+ Operator<? extends OperatorDesc> op = OperatorFactory.get(cCtx, fd);
+ return op;
+ }
+
+ private Operator<TableScanDesc> getTsOp(int i) {
+ Table tblMetadata = new Table("db", "table");
+ TableScanDesc desc = new TableScanDesc("alias"/*+ cCtx.nextOperatorId()*/, tblMetadata);
+ List<ExprNodeDesc> as =
+ Lists.newArrayList(new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, Integer.valueOf(i)),
+ new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "c1", "aa", false));
+ ExprNodeGenericFuncDesc f1 = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, udf, as);
+ desc.setFilterExpr(f1);
+ Operator<TableScanDesc> ts = OperatorFactory.get(cCtx, desc);
+ return ts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
index 8126970..e8a7a1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
@@ -33,10 +33,11 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.plan.mapper.EmptyStatsSource;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
-import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
+import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
@@ -129,7 +130,7 @@ public class TestCounterMapping {
FilterOperator filter1 = filters1.get(0);
driver = createDriver();
- ((ReExecDriver) driver).setStatsSource(new SimpleRuntimeStatsSource(pm1));
+ ((ReExecDriver) driver).setStatsSource(StatsSources.getStatsSourceContaining(EmptyStatsSource.INSTANCE, pm1));
PlanMapper pm2 = getMapperForQuery(driver, query);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
index b726300..8bec56f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestReOptimization.java
@@ -27,15 +27,18 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
import org.apache.hive.testutils.HiveTestEnvSetup;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -55,7 +58,7 @@ public class TestReOptimization {
public static void beforeClass() throws Exception {
IDriver driver = createDriver("");
dropTables(driver);
- String cmds[] = {
+ String[] cmds = {
// @formatter:off
"create table tu(id_uv int,id_uw int,u int)",
"create table tv(id_uv int,v int)",
@@ -78,8 +81,13 @@ public class TestReOptimization {
dropTables(driver);
}
+ @After
+ public void after() {
+ StatsSources.clearGlobalStats();
+ }
+
public static void dropTables(IDriver driver) throws Exception {
- String tables[] = { "tu", "tv", "tw" };
+ String[] tables = new String[] {"tu", "tv", "tw" };
for (String t : tables) {
int ret = driver.run("drop table if exists " + t).getResponseCode();
assertEquals("Checking command success", 0, ret);
@@ -98,7 +106,9 @@ public class TestReOptimization {
@Test
public void testStatsAreSetInReopt() throws Exception {
IDriver driver = createDriver("overlay,reoptimize");
- String query = "select assert_true_oom(${hiveconf:zzz} > sum(u*v)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
+ String query = "select assert_true_oom(${hiveconf:zzz} > sum(u*v))"
+ + " from tu join tv on (tu.id_uv=tv.id_uv)"
+ + " where u<10 and v>1";
PlanMapper pm = getMapperForQuery(driver, query);
Iterator<EquivGroup> itG = pm.iterateGroups();
@@ -133,7 +143,7 @@ public class TestReOptimization {
IDriver driver = createDriver("overlay,reoptimize");
String query =
"select assert_true_oom(${hiveconf:zzz}>sum(1)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
- PlanMapper pm = getMapperForQuery(driver, query);
+ getMapperForQuery(driver, query);
}
@@ -143,8 +153,72 @@ public class TestReOptimization {
String query =
"select assert_true(${hiveconf:zzz}>sum(1)) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
+ getMapperForQuery(driver, query);
+ assertEquals(1, driver.getContext().getExecutionIndex());
+ }
+
+ @Test
+ public void testStatCachingQuery() throws Exception {
+ HiveConf conf = env_setup.getTestCtx().hiveConf;
+ conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "query");
+ conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+ checkRuntimeStatsReuse(false, false, false);
+ }
+
+ @Test
+ public void testStatCachingHS2() throws Exception {
+ HiveConf conf = env_setup.getTestCtx().hiveConf;
+ conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "hiveserver");
+ conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+ checkRuntimeStatsReuse(true, true, false);
+ }
+
+ @Test
+ public void testStatCachingMetaStore() throws Exception {
+ HiveConf conf = env_setup.getTestCtx().hiveConf;
+ conf.setVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE, "metastore");
+ conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
+
+ checkRuntimeStatsReuse(true, true, true);
+ }
+
+ private void checkRuntimeStatsReuse(
+ boolean expectInSameSession,
+ boolean expectNewHs2Session,
+ boolean expectHs2Instance) throws CommandProcessorResponse {
+ {
+ // same session
+ IDriver driver = createDriver("reoptimize");
+ checkUsageOfRuntimeStats(driver, false);
+ driver = DriverFactory.newDriver(env_setup.getTestCtx().hiveConf);
+ checkUsageOfRuntimeStats(driver, expectInSameSession);
+ }
+ {
+ // new session
+ IDriver driver = createDriver("reoptimize");
+ checkUsageOfRuntimeStats(driver, expectNewHs2Session);
+ }
+ StatsSources.clearGlobalStats();
+ {
+ // new hs2 instance session
+ IDriver driver = createDriver("reoptimize");
+ checkUsageOfRuntimeStats(driver, expectHs2Instance);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void checkUsageOfRuntimeStats(IDriver driver, boolean expected) throws CommandProcessorResponse {
+ String query = "select sum(u) from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
PlanMapper pm = getMapperForQuery(driver, query);
assertEquals(1, driver.getContext().getExecutionIndex());
+ List<CommonJoinOperator> allJoin = pm.getAll(CommonJoinOperator.class);
+ CommonJoinOperator join = allJoin.iterator().next();
+ Statistics joinStat = join.getStatistics();
+
+ assertEquals("expectation of the usage of runtime stats doesn't match", expected,
+ joinStat.isRuntimeStats());
}
@Test
@@ -152,7 +226,7 @@ public class TestReOptimization {
IDriver driver = createDriver("overlay,reoptimize");
String query = "explain reoptimization select 1 from tu join tv on (tu.id_uv=tv.id_uv) where u<10 and v>1";
- PlanMapper pm = getMapperForQuery(driver, query);
+ getMapperForQuery(driver, query);
List<String> res = new ArrayList<>();
List<String> res1 = new ArrayList<>();
while (driver.getResults(res1)) {
@@ -165,6 +239,7 @@ public class TestReOptimization {
}
+
private static IDriver createDriver(String strategies) {
HiveConf conf = env_setup.getTestCtx().hiveConf;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3e2d8a0/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 1642357..e373628 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
@@ -247,6 +248,8 @@ public class HiveServer2 extends CompositeService {
// Create views registry
HiveMaterializedViewsRegistry.get().init();
+ StatsSources.initialize(hiveConf);
+
// Setup cache if enabled.
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
try {