You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/01/23 10:55:10 UTC
[2/2] hive git commit: HIVE-21078: Replicate column and table level
statistics for unpartitioned Hive tables (Ashutosh Bapat,
reviewed by Sankar Hariappan)
HIVE-21078: Replicate column and table level statistics for unpartitioned Hive tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2ffca04a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2ffca04a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2ffca04a
Branch: refs/heads/master
Commit: 2ffca04a8b58979b4995a5a6eb264f8f59d9b425
Parents: eba9646
Author: Ashutosh Bapat <ab...@cloudera.com>
Authored: Wed Jan 23 16:21:04 2019 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Wed Jan 23 16:24:28 2019 +0530
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 1 +
.../hive/ql/parse/TestReplicationScenarios.java | 1 +
.../TestReplicationScenariosAcidTables.java | 1 +
...TestReplicationScenariosAcrossInstances.java | 1 +
.../ql/parse/TestStatsReplicationScenarios.java | 306 +++++++++++++++++++
...stStatsReplicationScenariosNoAutogather.java | 55 ++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 41 +++
.../hive/ql/exec/ColumnStatsUpdateTask.java | 32 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 13 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 18 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 6 +-
.../events/filesystem/FSTableEvent.java | 5 +
.../repl/bootstrap/load/table/LoadTable.java | 1 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 41 ++-
.../ql/metadata/SessionHiveMetaStoreClient.java | 22 +-
.../apache/hadoop/hive/ql/metadata/Table.java | 27 ++
.../hive/ql/parse/ImportSemanticAnalyzer.java | 11 +-
.../hive/ql/parse/repl/dump/HiveWrapper.java | 8 +-
.../hive/ql/parse/repl/dump/TableExport.java | 6 +-
.../repl/dump/events/AlterTableHandler.java | 9 +
.../repl/dump/events/CreateTableHandler.java | 8 +
.../dump/events/UpdateTableColStatHandler.java | 21 +-
.../load/message/UpdateTableColStatHandler.java | 37 ++-
.../hive/ql/plan/ColumnStatsUpdateWork.java | 15 +
.../hadoop/hive/ql/plan/CreateTableDesc.java | 40 ++-
.../hadoop/hive/ql/plan/ImportTableDesc.java | 3 +-
.../apache/hadoop/hive/ql/plan/MoveWork.java | 9 +
.../hadoop/hive/ql/exec/TestExecDriver.java | 2 +-
.../hive/metastore/api/GetTableRequest.java | 111 ++++++-
.../apache/hadoop/hive/metastore/api/Table.java | 116 ++++++-
.../src/gen/thrift/gen-php/metastore/Types.php | 51 ++++
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 31 +-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 8 +-
.../hadoop/hive/common/StatsSetupConst.java | 29 ++
.../hive/metastore/HiveMetaStoreClient.java | 17 ++
.../hadoop/hive/metastore/IMetaStoreClient.java | 44 +++
.../src/main/thrift/hive_metastore.thrift | 6 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 61 +++-
.../hadoop/hive/metastore/ObjectStore.java | 4 +-
.../apache/hadoop/hive/metastore/RawStore.java | 1 -
.../hive/metastore/cache/CachedStore.java | 1 +
.../events/UpdateTableColumnStatEvent.java | 11 +-
.../metastore/messaging/MessageBuilder.java | 5 +-
.../messaging/UpdateTableColumnStatMessage.java | 4 +
.../json/JSONUpdateTableColumnStatMessage.java | 13 +-
.../HiveMetaStoreClientPreCatalog.java | 12 +
.../apache/hadoop/hive/metastore/TestStats.java | 89 +++---
47 files changed, 1236 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index fa7ab25..81b35a4 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -757,6 +757,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent) throws MetaException {
UpdateTableColumnStatMessage msg = MessageBuilder.getInstance()
.buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
+ updateTableColumnStatEvent.getTableObj(),
updateTableColumnStatEvent.getTableParameters(),
updateTableColumnStatEvent.getValidWriteIds(), updateTableColumnStatEvent.getWriteId());
NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index c85a2a4..6e9c443 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -570,6 +570,7 @@ public class TestReplicationScenarios {
@Nullable
@Override
public Table apply(@Nullable Table table) {
+ LOG.info("Performing injection on table " + table.getTableName());
if (table.getTableName().equalsIgnoreCase("ptned")){
injectionPathCalled = true;
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 4472a61..342985e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -66,6 +66,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI
* TestReplicationScenariosAcidTables - test replication for ACID tables
*/
public class TestReplicationScenariosAcidTables {
+
@Rule
public final TestName testName = new TestName();
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 0df99b3..1adec4e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -305,6 +305,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
+ .run("insert into t1 values (1), (2)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='us') values ('austin')")
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
new file mode 100644
index 0000000..8815a13
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+/**
+ * Tests for statistics replication.
+ */
+public class TestStatsReplicationScenarios {
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+ static WarehouseInstance primary;
+ private static WarehouseInstance replica;
+ private String primaryDbName, replicatedDbName;
+ private static HiveConf conf;
+ private static boolean hasAutogather;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenarios.class, true);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz,
+ boolean autogather)
+ throws Exception {
+ conf = new HiveConf(clazz);
+ conf.set("dfs.client.use.datanode.hostname", "true");
+ conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+ MiniDFSCluster miniDFSCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ Map<String, String> localOverrides = new HashMap<String, String>() {{
+ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+ put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ }};
+ localOverrides.putAll(overrides);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+
+ // Run with autogather false on primary if requested
+ hasAutogather = autogather;
+ localOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
+ autogather ? "true" : "false");
+ primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ }
+
+ @AfterClass
+ public static void classLevelTearDown() throws IOException {
+ primary.close();
+ replica.close();
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + primaryDbName + " cascade");
+ replica.run("drop database if exists " + replicatedDbName + " cascade");
+ }
+
+
+ private Map<String, String> collectStatsParams(Map<String, String> allParams) {
+ Map<String, String> statsParams = new HashMap<String, String>();
+ List<String> params = new ArrayList<>(StatsSetupConst.SUPPORTED_STATS);
+ params.add(StatsSetupConst.COLUMN_STATS_ACCURATE);
+ for (String param : params) {
+ String value = allParams.get(param);
+ if (value != null) {
+ statsParams.put(param, value);
+ }
+ }
+
+ return statsParams;
+ }
+
+ private void verifyReplicatedStatsForTable(String tableName) throws Exception {
+ // Test column stats
+ Assert.assertEquals(primary.getTableColumnStatistics(primaryDbName, tableName),
+ replica.getTableColumnStatistics(replicatedDbName, tableName));
+
+ // Test table level stats
+ Map<String, String> rParams =
+ collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
+ Map<String, String> pParams =
+ collectStatsParams(primary.getTable(primaryDbName, tableName).getParameters());
+ Assert.assertEquals(pParams, rParams);
+ }
+
+ private void verifyNoStatsReplicationForMetadataOnly(String tableName) throws Throwable {
+ // Test column stats
+ Assert.assertTrue(replica.getTableColumnStatistics(replicatedDbName, tableName).isEmpty());
+
+ // When no data is replicated, the basic stats parameters for table should look as if it's a
+ // new table created on replica. Based on the create table rules the basic stats may be true
+ // or false. Either is fine with us so don't bother checking exact values.
+ Map<String, String> rParams =
+ collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
+ List<String> params = new ArrayList<>(StatsSetupConst.SUPPORTED_STATS);
+ Map<String, String> expectedFalseParams = new HashMap<>();
+ Map<String, String> expectedTrueParams = new HashMap<>();
+ StatsSetupConst.setStatsStateForCreateTable(expectedTrueParams,
+ replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.TRUE);
+ StatsSetupConst.setStatsStateForCreateTable(expectedFalseParams,
+ replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.FALSE);
+ Assert.assertTrue(rParams.equals(expectedFalseParams) || rParams.equals(expectedTrueParams));
+ }
+
+ private List<String> createBootStrapData() throws Throwable {
+ String simpleTableName = "sTable";
+ String partTableName = "pTable";
+ String ndTableName = "ndTable";
+
+ primary.run("use " + primaryDbName)
+ .run("create table " + simpleTableName + " (id int)")
+ .run("insert into " + simpleTableName + " values (1), (2)")
+ .run("create table " + partTableName + " (place string) partitioned by (country string)")
+ .run("insert into table " + partTableName + " partition(country='india') values ('bangalore')")
+ .run("insert into table " + partTableName + " partition(country='us') values ('austin')")
+ .run("insert into table " + partTableName + " partition(country='france') values ('paris')")
+ .run("create table " + ndTableName + " (str string)");
+
+ List<String> tableNames = new ArrayList<String>(Arrays.asList(simpleTableName, partTableName,
+ ndTableName));
+
+ // Run analyze on each of the tables, if they are not being gathered automatically.
+ if (!hasAutogather) {
+ for (String name : tableNames) {
+ Assert.assertTrue(primary.getTableColumnStatistics(primaryDbName, name).isEmpty());
+ primary.run("use " + primaryDbName)
+ .run("analyze table " + name + " compute statistics for columns");
+ }
+ }
+
+ return tableNames;
+ }
+
+ /**
+ * Dumps primarydb on primary, loads it on replica as replicadb, verifies that the statistics
+ * loaded are same as the ones on primary.
+ * @param tableNames, names of tables on primary expected to be loaded
+ * @param lastReplicationId of the last dump, for incremental dump/load
+ * @param parallelLoad, if true, parallel bootstrap load is used
+ * @param metadataOnly, only metadata is dumped and loaded.
+ * @return lastReplicationId of the dump performed.
+ */
+ private String dumpLoadVerify(List<String> tableNames, String lastReplicationId,
+ boolean parallelLoad, boolean metadataOnly)
+ throws Throwable {
+ List<String> withClauseList;
+ // Parallel load works only for bootstrap.
+ parallelLoad = parallelLoad && (lastReplicationId == null);
+
+ // With clause construction for REPL DUMP command.
+ if (metadataOnly) {
+ withClauseList = Collections.singletonList("'hive.repl.dump.metadata.only'='true'");
+ } else {
+ withClauseList = Collections.emptyList();
+ }
+
+ // Take dump
+ WarehouseInstance.Tuple dumpTuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, lastReplicationId, withClauseList);
+
+ // Load, if necessary changing configuration.
+ if (parallelLoad && lastReplicationId == null) {
+ replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
+ }
+
+ replica.load(replicatedDbName, dumpTuple.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(tableNames.toArray(new String[1]));
+
+ // Metadata load may not load all the events.
+ if (!metadataOnly) {
+ replica.run("repl status " + replicatedDbName)
+ .verifyResult(dumpTuple.lastReplicationId);
+ }
+
+ if (parallelLoad) {
+ replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, false);
+ }
+
+ // Test statistics
+ for (String name : tableNames) {
+ if (metadataOnly) {
+ verifyNoStatsReplicationForMetadataOnly(name);
+ } else {
+ verifyReplicatedStatsForTable(name);
+ }
+ }
+
+ return dumpTuple.lastReplicationId;
+ }
+
+ private void createIncrementalData(List<String> tableNames) throws Throwable {
+ String simpleTableName = "sTable";
+ String partTableName = "pTable";
+ String ndTableName = "ndTable";
+
+ Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+ ndTableName)));
+ String incTableName = "iTable"; // New table
+
+ primary.run("use " + primaryDbName)
+ .run("insert into " + simpleTableName + " values (3), (4)")
+ // new data inserted into table
+ .run("insert into " + ndTableName + " values ('string1'), ('string2')")
+ // two partitions changed and one unchanged
+ .run("insert into table " + partTableName + " values ('india', 'pune')")
+ .run("insert into table " + partTableName + " values ('us', 'chicago')")
+ // new partition
+ .run("insert into table " + partTableName + " values ('australia', 'perth')")
+ .run("create table " + incTableName + " (config string, enabled boolean)")
+ .run("insert into " + incTableName + " values ('conf1', true)")
+ .run("insert into " + incTableName + " values ('conf2', false)");
+ tableNames.add(incTableName);
+
+ // Run analyze on each of the tables, if they are not being gathered automatically.
+ if (!hasAutogather) {
+ for (String name : tableNames) {
+ primary.run("use " + primaryDbName)
+ .run("analyze table " + name + " compute statistics for columns");
+ }
+ }
+
+ }
+
+ public void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly) throws Throwable {
+ List<String> tableNames = createBootStrapData();
+ String lastReplicationId = dumpLoadVerify(tableNames, null, parallelBootstrap,
+ metadataOnly);
+
+ // Incremental dump
+ createIncrementalData(tableNames);
+ lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+ metadataOnly);
+ }
+
+ @Test
+ public void testForNonAcidTables() throws Throwable {
+ testStatsReplicationCommon(false, false);
+ }
+
+ @Test
+ public void testForNonAcidTablesParallelBootstrapLoad() throws Throwable {
+ testStatsReplicationCommon(true, false);
+ }
+
+ @Test
+ public void testNonAcidMetadataOnlyDump() throws Throwable {
+ testStatsReplicationCommon(false, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
new file mode 100644
index 0000000..f58ddb8
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication when statistics are collected using ANALYZE command.
+ */
+public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+ static WarehouseInstance primary;
+ private static WarehouseInstance replica;
+ private String primaryDbName, replicatedDbName;
+ private static HiveConf conf;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index bf4154c..b272f06 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -380,6 +381,46 @@ public class WarehouseInstance implements Closeable {
}
}
+ /**
+ * Get statistics for given set of columns of a given table in the given database.
+ * @param dbName - the database where the table resides
+ * @param tableName - tablename whose statistics are to be retrieved
+ * @return - list of ColumnStatisticsObj objects in the order of the specified columns
+ */
+ public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tableName) throws Exception {
+ return client.getTableColumnStatistics(dbName, tableName, getTableColNames(dbName, tableName));
+ }
+
+ /**
+ * @param dbName, database name
+ * @param tableName, table name
+ * @return - list of columns of given table in the given database.
+ * @throws Exception
+ */
+ public List<String> getTableColNames(String dbName, String tableName) throws Exception {
+ List<String> colNames = new ArrayList();
+ client.getSchema(dbName, tableName).forEach(fs -> colNames.add(fs.getName()));
+ return colNames;
+ }
+
+ /**
+ * Get statistics for given set of columns for all the partitions of a given table in the given
+ * database.
+ * @param dbName - the database where the table resides
+ * @param tableName - name of the partitioned table in the database
+ * @param colNames - columns whose statistics is to be retrieved
+ * @return Map of partition name and list of ColumnStatisticsObj. The objects in the list are
+ * ordered according to the given list of columns.
+ * @throws Exception
+ */
+ Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
+ String tableName,
+ List<String> colNames)
+ throws Exception {
+ return client.getPartitionColumnStatistics(dbName, tableName,
+ client.listPartitionNames(dbName, tableName, (short) -1), colNames);
+ }
+
public List<Partition> getAllPartitions(String dbName, String tableName) throws Exception {
try {
return client.listPartitions(dbName, tableName, Short.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index 61fb3d3..cf00d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -77,6 +78,14 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
private ColumnStatistics constructColumnStatsFromInput()
throws SemanticException, MetaException {
+ // If we are replicating the stats, we don't need to construct those again.
+ if (work.getColStats() != null) {
+ ColumnStatistics colStats = work.getColStats();
+ LOG.debug("Got stats through replication for " +
+ colStats.getStatsDesc().getDbName() + "." +
+ colStats.getStatsDesc().getTableName());
+ return colStats;
+ }
String dbName = work.dbName();
String tableName = work.getTableName();
String partName = work.getPartName();
@@ -287,9 +296,22 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
}
private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
- List<ColumnStatistics> colStats = new ArrayList<>();
- colStats.add(constructColumnStatsFromInput());
- SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
+ ColumnStatistics colStats = constructColumnStatsFromInput();
+ ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
+ // We do not support stats replication for a transactional table yet. If we are converting
+ // a non-transactional table to a transactional table during replication, we might get
+ // column statistics but we shouldn't update those.
+ if (work.getColStats() != null &&
+ AcidUtils.isTransactionalTable(getHive().getTable(colStatsDesc.getDbName(),
+ colStatsDesc.getTableName()))) {
+ LOG.debug("Skipped updating column stats for table " +
+ TableName.getDbTable(colStatsDesc.getDbName(), colStatsDesc.getTableName()) +
+ " because it is converted to a transactional table during replication.");
+ return 0;
+ }
+
+ SetPartitionsStatsRequest request =
+ new SetPartitionsStatsRequest(Collections.singletonList(colStats));
db.setPartitionColumnStatistics(request);
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index dfa7e5e..cb7fdf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4724,6 +4724,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
if (crtTbl.getReplaceMode()) {
ReplicationSpec replicationSpec = crtTbl.getReplicationSpec();
long writeId = 0;
+ EnvironmentContext environmentContext = null;
if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
if (replicationSpec.isMigratingToTxnTable()) {
// for migration we start the transaction and allocate write id in repl txn task for migration.
@@ -4735,11 +4736,19 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
} else {
writeId = crtTbl.getReplWriteId();
}
+
+ // In case of replication statistics is obtained from the source, so do not update those
+ // on replica. Since we are not replicating statisics for transactional tables, do not do
+ // so for transactional tables right now.
+ if (!AcidUtils.isTransactionalTable(crtTbl)) {
+ environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+ }
}
// replace-mode creates are really alters using CreateTableDesc.
- db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, null,
- true, writeId);
+ db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false,
+ environmentContext, true, writeId);
} else {
if ((foreignKeys != null && foreignKeys.size() > 0) ||
(primaryKeys != null && primaryKeys.size() > 0) ||
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index ca4391f..fb35c79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -398,8 +398,24 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
Utilities.FILE_OP_LOGGER.trace("loadTable called from " + tbd.getSourcePath()
+ " into " + tbd.getTable().getTableName());
}
+
+ boolean resetStatistics;
+ if (hasFollowingStatsTask()) {
+ // If there's a follow-on stats task then the stats will be correct after load, so don't
+ // need to reset the statistics.
+ resetStatistics = false;
+ } else if (!work.getIsInReplicationScope()) {
+ // If the load is not happening during replication and there is not follow-on stats
+ // task, stats will be inaccurate after load and so need to be reset.
+ resetStatistics = true;
+ } else {
+ // If we are loading a table during replication, the stats will also be replicated
+ // and hence accurate if it's a non-transactional table. For transactional table we
+ // do not replicate stats yet.
+ resetStatistics = AcidUtils.isTransactionalTable(table.getParameters());
+ }
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
- work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
+ work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, resetStatistics,
tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());
if (work.getOutputs() != null) {
DDLTask.addIfAbsentByName(new WriteEntity(table,
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index a5b944b..947bfcf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -257,7 +257,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
try {
- HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName);
+ HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName,
+ conf);
boolean shouldWriteExternalTableLocationInfo =
conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
&& TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())
@@ -335,6 +336,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
// as bootstrap dump's last repl Id.
tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
+
+ // For now we do not replicate stats for ACID table. So, wipe out column stats if any.
+ tableSpec.tableHandle.getTTable().unsetColStats();
}
MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
new TableExport(
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 599eb04..d57cbd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -97,6 +97,11 @@ public class FSTableEvent implements TableEvent {
// If the conversion is from non transactional to transactional table
if (AcidUtils.isTransactionalTable(table)) {
replicationSpec().setMigratingToTxnTable();
+ // There won't be any writeId associated with statistics on source non-transactional
+ // table. We will need to associate a cooked up writeId on target for those. But that's
+ // not done yet. Till then we don't replicate statistics for ACID table even if it's
+ // available on the source.
+ table.getTTable().unsetColStats();
}
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
// since we have converted to an external table now after applying the migration rules the
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index e0f0979..0d1a88c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -319,6 +319,7 @@ public class LoadTable {
);
moveWork.setLoadTableWork(loadTableWork);
}
+ moveWork.setIsInReplicationScope(replicationSpec.isInReplicationScope());
Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
copyTask.addDependentTask(loadTableTask);
return copyTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c017790..cd59efb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1241,8 +1241,30 @@ public class Hive {
* @return the table or if throwException is false a null value.
* @throws HiveException
*/
- public Table getTable(final String dbName, final String tableName,
- boolean throwException, boolean checkTransactional) throws HiveException {
+ public Table getTable(final String dbName, final String tableName, boolean throwException,
+ boolean checkTransactional) throws HiveException {
+ return getTable(dbName, tableName, throwException, checkTransactional, false);
+ }
+
+ /**
+ * Returns metadata of the table.
+ *
+ * @param dbName
+ * the name of the database
+ * @param tableName
+ * the name of the table
+ * @param throwException
+ * controls whether an exception is thrown or a returns a null
+ * @param checkTransactional
+ * checks whether the metadata table stats are valid (or
+ * compilant with the snapshot isolation of) for the current transaction.
+ * @param getColumnStats
+ * get column statistics if available
+ * @return the table or if throwException is false a null value.
+ * @throws HiveException
+ */
+ public Table getTable(final String dbName, final String tableName, boolean throwException,
+ boolean checkTransactional, boolean getColumnStats) throws HiveException {
if (tableName == null || tableName.equals("")) {
throw new HiveException("empty table creation??");
@@ -1261,9 +1283,9 @@ public class Hive {
dbName, tableName);
}
tTable = getMSC().getTable(getDefaultCatalog(conf), dbName, tableName,
- validWriteIdList != null ? validWriteIdList.toString() : null);
+ validWriteIdList != null ? validWriteIdList.toString() : null, getColumnStats);
} else {
- tTable = getMSC().getTable(dbName, tableName);
+ tTable = getMSC().getTable(dbName, tableName, getColumnStats);
}
} catch (NoSuchObjectException e) {
if (throwException) {
@@ -2755,14 +2777,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
* If the source directory is LOCAL
* @param isSkewedStoreAsSubdir
* if list bucketing enabled
- * @param hasFollowingStatsTask
- * if there is any following stats task
* @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
+ * @param resetStatistics should reset statistics as part of move.
* @param writeId write ID allocated for the current load operation
* @param stmtId statement ID of the current load statement
*/
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
- boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+ boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean resetStatistics,
Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {
PerfLogger perfLogger = SessionState.getPerfLogger();
@@ -2835,11 +2856,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES);
}
if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+ LOG.debug("setting table statistics false for " + tbl.getDbName() + "." + tbl.getTableName());
StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
}
//column stats will be inaccurate
- if (!hasFollowingStatsTask) {
+ if (resetStatistics) {
+ LOG.debug("Clearing table statistics for " + tbl.getDbName() + "." + tbl.getTableName());
StatsSetupConst.clearColumnStatsState(tbl.getParameters());
}
@@ -2858,7 +2881,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
EnvironmentContext environmentContext = null;
- if (hasFollowingStatsTask) {
+ if (!resetStatistics) {
environmentContext = new EnvironmentContext();
environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 322b580..83cb3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -174,13 +174,20 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
@Override
public org.apache.hadoop.hive.metastore.api.Table getTable(String dbname, String name) throws MetaException,
TException, NoSuchObjectException {
+ return getTable(dbname, name, false);
+ }
+
+ @Override
+ public org.apache.hadoop.hive.metastore.api.Table getTable(String dbname, String name,
+ boolean getColStats) throws MetaException,
+ TException, NoSuchObjectException {
// First check temp tables
org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbname, name);
if (table != null) {
return deepCopy(table); // Original method used deepCopy(), do the same here.
}
// Try underlying client
- return super.getTable(MetaStoreUtils.getDefaultCatalog(conf), dbname, name);
+ return super.getTable(MetaStoreUtils.getDefaultCatalog(conf), dbname, name, getColStats);
}
// Need to override this one too or dropTable breaks because it doesn't find the table when checks
@@ -188,10 +195,19 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
@Override
public org.apache.hadoop.hive.metastore.api.Table getTable(String catName, String dbName,
String tableName) throws TException {
+ return getTable(catName, dbName, tableName, false);
+ }
+
+ // Need to override this one too or dropTable breaks because it doesn't find the table when checks
+ // before the drop.
+ @Override
+ public org.apache.hadoop.hive.metastore.api.Table getTable(String catName, String dbName,
+ String tableName, boolean getColStats)
+ throws TException {
if (!DEFAULT_CATALOG_NAME.equals(catName)) {
- return super.getTable(catName, dbName, tableName);
+ return super.getTable(catName, dbName, tableName, getColStats);
} else {
- return getTable(dbName, tableName);
+ return getTable(dbName, tableName, getColStats);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 89b2db3..cd483eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -1106,4 +1109,28 @@ public class Table implements Serializable {
public Boolean isOutdatedForRewriting() {
return outdatedForRewritingMaterializedView;
}
+
+ public ColumnStatistics getColStats() {
+ return tTable.isSetColStats() ? tTable.getColStats() : null;
+ }
+
+ /**
+ * Setup the table level stats as if the table is new. Used when setting up Table for a new
+ * table or during replication.
+ */
+ public void setStatsStateLikeNewTable() {
+ // We do not replicate statistics for
+ // an ACID Table right now, so don't touch them right now.
+ if (AcidUtils.isTransactionalTable(this)) {
+ return;
+ }
+
+ if (isPartitioned()) {
+ StatsSetupConst.setStatsStateForCreateTable(getParameters(), null,
+ StatsSetupConst.FALSE);
+ } else {
+ StatsSetupConst.setStatsStateForCreateTable(getParameters(),
+ MetaStoreUtils.getColumnNames(getCols()), StatsSetupConst.TRUE);
+ }
+ }
};
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index a843987..6102339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -284,6 +284,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
//if the conversion is from non transactional to transactional table
if (TxnUtils.isTransactionalTable(tblObj)) {
replicationSpec.setMigratingToTxnTable();
+ // There won't be any writeId associated with statistics on source non-transactional
+ // table. We will need to associate a cooked up writeId on target for those. But that's
+ // not done yet. Till then we don't replicate statistics for ACID table even if it's
+ // available on the source.
+ tblObj.unsetColStats();
}
tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
@@ -302,7 +307,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
boolean inReplicationScope = false;
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
tblDesc.setReplicationSpec(replicationSpec);
- StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+ // Statistics for a non-transactional table will be replicated separately. Don't bother
+ // with it here.
+ if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
+ StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+ }
inReplicationScope = true;
tblDesc.setReplWriteId(writeId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
index fb8c4ca..2fa3676 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.dump;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -54,8 +55,11 @@ public class HiveWrapper {
return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName));
}
- public Tuple<Table> table(final String tableName) throws HiveException {
- return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName));
+ public Tuple<Table> table(final String tableName, HiveConf conf) throws HiveException {
+ // Column statistics won't be accurate if we are dumping only metadata
+ boolean getColStats = !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+ return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName, true, false,
+ getColStats));
}
public static class Tuple<T> {
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index b60be88..adc9446 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -72,9 +72,11 @@ public class TableExport {
? null
: tableSpec;
this.replicationSpec = replicationSpec;
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || (this.tableSpec != null
- && this.tableSpec.tableHandle.isView())) {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) ||
+ (this.tableSpec != null && this.tableSpec.tableHandle.isView())) {
this.replicationSpec.setIsMetadataOnly(true);
+
+ this.tableSpec.tableHandle.setStatsStateLikeNewTable();
}
this.db = db;
this.distCpDoAsUser = distCpDoAsUser;
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index 00fa370..ff43399 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -94,6 +95,14 @@ class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> {
withinContext.replicationSpec.setIsMetadataOnly(true);
Table qlMdTableAfter = new Table(after);
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+
+ // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics
+ // as well, since that won't be accurate. So reset them to what they would look like for an
+ // empty table.
+ if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ qlMdTableAfter.setStatsStateLikeNewTable();
+ }
+
EximUtil.createExportDump(
metaDataPath.getFileSystem(withinContext.hiveConf),
metaDataPath,
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 5870876..a8bf671 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -61,6 +62,13 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
withinContext.replicationSpec.setIsMetadataOnly(true);
}
+ // If we are not dumping data about a table, we shouldn't be dumping basic statistics
+ // as well, since that won't be accurate. So reset them to what they would look like for an
+ // empty table.
+ if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ qlMdTable.setStatsStateLikeNewTable();
+ }
+
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
EximUtil.createExportDump(
metaDataPath.getFileSystem(withinContext.hiveConf),
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index a3aecde..e50a2bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -19,14 +19,15 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnStatMessage> {
- UpdateTableColStatHandler(NotificationEvent event) {
- super(event);
- }
+ UpdateTableColStatHandler(NotificationEvent event) { super(event); }
@Override
UpdateTableColumnStatMessage eventMessage(String stringRepresentation) {
@@ -35,6 +36,20 @@ class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnSt
@Override
public void handle(Context withinContext) throws Exception {
+ Table qlMdTable = new Table(eventMessage.getTableObject());
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ return;
+ }
+
+ // Statistics without data doesn't make sense.
+ if (withinContext.replicationSpec.isMetadataOnly()) {
+ return;
+ }
+ // For now we do not replicate the statistics for transactional tables.
+ if (AcidUtils.isTransactionalTable(qlMdTable)) {
+ return;
+ }
+
LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(eventMessageAsJSON);
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
index eb3d18a..9a60de4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.load.message;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import java.io.Serializable;
import java.util.Collections;
@@ -31,13 +34,29 @@ import java.util.List;
* Target(Load) side handler for table stat update event
*/
public class UpdateTableColStatHandler extends AbstractMessageHandler {
- @Override
- public List<Task<? extends Serializable>> handle(Context context)
- throws SemanticException {
- context.log.info("Replication of table stat update event is not supported yet");
- if (!context.isDbNameEmpty()) {
- updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+ UpdateTableColumnStatMessage utcsm =
+ deserializer.getUpdateTableColumnStatMessage(context.dmd.getPayload());
+
+ // Update tablename and database name in the statistics object
+ ColumnStatistics colStats = utcsm.getColumnStatistics();
+ ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
+ colStatsDesc.setDbName(context.dbName);
+ if (!context.isTableNameEmpty()) {
+ colStatsDesc.setTableName(context.tableName);
+ }
+ if (!context.isDbNameEmpty()) {
+ updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName,
+ context.tableName, null);
+ }
+
+ // TODO: For txn stats update, ColumnStatsUpdateTask.execute()->Hive
+ // .setPartitionColumnStatistics expects a valid writeId allocated by the current txn and
+ // also, there should be a table snapshot. But, it won't be there as update from
+ // ReplLoadTask which doesn't have a write id allocated. Need to check this further.
+ return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
+ context.hiveConf));
}
- return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf));
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index 6de1a37..1219b62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -42,6 +43,7 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
private final String tableName;
private final String colName;
private final String colType;
+ private final ColumnStatistics colStats;
private long writeId;
public ColumnStatsUpdateWork(String partName,
@@ -56,6 +58,17 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
this.tableName = tableName;
this.colName = colName;
this.colType = colType;
+ this.colStats = null;
+ }
+
+ public ColumnStatsUpdateWork(ColumnStatistics colStats) {
+ this.colStats = colStats;
+ this.partName = null;
+ this.mapProp = null;
+ this.dbName = null;
+ this.tableName = null;
+ this.colName = null;
+ this.colType = null;
}
@Override
@@ -87,6 +100,8 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
return colType;
}
+ public ColumnStatistics getColStats() { return colStats; }
+
@Override
public void setWriteId(long writeId) {
this.writeId = writeId;
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index f00148b..c71ff6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.PartitionManagementTask;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.DDLTask;
@@ -106,6 +109,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
List<SQLNotNullConstraint> notNullConstraints;
List<SQLDefaultConstraint> defaultConstraints;
List<SQLCheckConstraint> checkConstraints;
+ private ColumnStatistics colStats;
private Long initialMmWriteId; // Initial MM write ID for CTAS and import.
// The FSOP configuration for the FSOP that is going to write initial data during ctas.
// This is not needed beyond compilation, so it is transient.
@@ -127,7 +131,8 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues,
List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
- List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) {
+ List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints,
+ ColumnStatistics colStats) {
this(tableName, isExternal, isTemporary, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
@@ -137,6 +142,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
this.databaseName = databaseName;
+ this.colStats = colStats;
}
public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary,
@@ -157,7 +163,8 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
outputFormat, location, serName, storageHandler, serdeProps,
tblProps, ifNotExists, skewedColNames, skewedColValues,
- primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+ primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints,
+ null);
this.partColNames = partColNames;
this.isCTAS = isCTAS;
}
@@ -878,14 +885,29 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
}
}
- if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
- if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
- MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
- }
+ if (colStats != null) {
+ ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc());
+ colStatsDesc.setCatName(tbl.getCatName());
+ colStatsDesc.setDbName(getTableName());
+ colStatsDesc.setDbName(getDatabaseName());
+ tbl.getTTable().setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+ }
+
+ // The statistics for non-transactional tables will be obtained from the source. Do not
+ // reset those on replica.
+ if (replicationSpec != null && replicationSpec.isInReplicationScope() &&
+ !TxnUtils.isTransactionalTable(tbl.getTTable())) {
+ // Do nothing to the table statistics.
} else {
- StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(), null,
- StatsSetupConst.FALSE);
+ if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
+ if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+ StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
+ MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
+ }
+ } else {
+ StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(), null,
+ StatsSetupConst.FALSE);
+ }
}
return tbl;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 50b43ba..5c30fca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -84,7 +84,8 @@ public class ImportTableDesc {
null,
null,
null,
- null);
+ null,
+ table.getColStats());
this.createTblDesc.setStoredAsSubDirectories(table.getSd().isStoredAsSubDirectories());
break;
case VIEW:
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 47a56d5..8ca8e46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -56,6 +56,7 @@ public class MoveWork implements Serializable {
*/
protected List<Partition> movedParts;
private boolean isNoop;
+ private boolean isInReplicationScope = false;
public MoveWork() {
}
@@ -164,4 +165,12 @@ public class MoveWork implements Serializable {
public void setNeedCleanTarget(boolean needCleanTarget) {
this.needCleanTarget = needCleanTarget;
}
+
+ public void setIsInReplicationScope(boolean isInReplicationScope) {
+ this.isInReplicationScope = isInReplicationScope;
+ }
+
+ public boolean getIsInReplicationScope() {
+ return this.isInReplicationScope;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index e108684..78f2585 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -142,7 +142,7 @@ public class TestExecDriver extends TestCase {
db.createTable(src, cols, null, TextInputFormat.class,
HiveIgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
- true, false, false, false, null, 0, false);
+ true, false, false, true, null, 0, false);
i++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
index 2804952..3b60695 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField CAPABILITIES_FIELD_DESC = new org.apache.thrift.protocol.TField("capabilities", org.apache.thrift.protocol.TType.STRUCT, (short)3);
private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField VALID_WRITE_ID_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("validWriteIdList", org.apache.thrift.protocol.TType.STRING, (short)6);
+ private static final org.apache.thrift.protocol.TField GET_COLUMN_STATS_FIELD_DESC = new org.apache.thrift.protocol.TField("getColumnStats", org.apache.thrift.protocol.TType.BOOL, (short)7);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
private ClientCapabilities capabilities; // optional
private String catName; // optional
private String validWriteIdList; // optional
+ private boolean getColumnStats; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -62,7 +64,8 @@ import org.slf4j.LoggerFactory;
TBL_NAME((short)2, "tblName"),
CAPABILITIES((short)3, "capabilities"),
CAT_NAME((short)4, "catName"),
- VALID_WRITE_ID_LIST((short)6, "validWriteIdList");
+ VALID_WRITE_ID_LIST((short)6, "validWriteIdList"),
+ GET_COLUMN_STATS((short)7, "getColumnStats");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -87,6 +90,8 @@ import org.slf4j.LoggerFactory;
return CAT_NAME;
case 6: // VALID_WRITE_ID_LIST
return VALID_WRITE_ID_LIST;
+ case 7: // GET_COLUMN_STATS
+ return GET_COLUMN_STATS;
default:
return null;
}
@@ -127,7 +132,9 @@ import org.slf4j.LoggerFactory;
}
// isset id assignments
- private static final _Fields optionals[] = {_Fields.CAPABILITIES,_Fields.CAT_NAME,_Fields.VALID_WRITE_ID_LIST};
+ private static final int __GETCOLUMNSTATS_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.CAPABILITIES,_Fields.CAT_NAME,_Fields.VALID_WRITE_ID_LIST,_Fields.GET_COLUMN_STATS};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -141,6 +148,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.VALID_WRITE_ID_LIST, new org.apache.thrift.meta_data.FieldMetaData("validWriteIdList", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.GET_COLUMN_STATS, new org.apache.thrift.meta_data.FieldMetaData("getColumnStats", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetTableRequest.class, metaDataMap);
}
@@ -161,6 +170,7 @@ import org.slf4j.LoggerFactory;
* Performs a deep copy on <i>other</i>.
*/
public GetTableRequest(GetTableRequest other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetDbName()) {
this.dbName = other.dbName;
}
@@ -176,6 +186,7 @@ import org.slf4j.LoggerFactory;
if (other.isSetValidWriteIdList()) {
this.validWriteIdList = other.validWriteIdList;
}
+ this.getColumnStats = other.getColumnStats;
}
public GetTableRequest deepCopy() {
@@ -189,6 +200,8 @@ import org.slf4j.LoggerFactory;
this.capabilities = null;
this.catName = null;
this.validWriteIdList = null;
+ setGetColumnStatsIsSet(false);
+ this.getColumnStats = false;
}
public String getDbName() {
@@ -306,6 +319,28 @@ import org.slf4j.LoggerFactory;
}
}
+ public boolean isGetColumnStats() {
+ return this.getColumnStats;
+ }
+
+ public void setGetColumnStats(boolean getColumnStats) {
+ this.getColumnStats = getColumnStats;
+ setGetColumnStatsIsSet(true);
+ }
+
+ public void unsetGetColumnStats() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID);
+ }
+
+ /** Returns true if field getColumnStats is set (has been assigned a value) and false otherwise */
+ public boolean isSetGetColumnStats() {
+ return EncodingUtils.testBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID);
+ }
+
+ public void setGetColumnStatsIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case DB_NAME:
@@ -348,6 +383,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case GET_COLUMN_STATS:
+ if (value == null) {
+ unsetGetColumnStats();
+ } else {
+ setGetColumnStats((Boolean)value);
+ }
+ break;
+
}
}
@@ -368,6 +411,9 @@ import org.slf4j.LoggerFactory;
case VALID_WRITE_ID_LIST:
return getValidWriteIdList();
+ case GET_COLUMN_STATS:
+ return isGetColumnStats();
+
}
throw new IllegalStateException();
}
@@ -389,6 +435,8 @@ import org.slf4j.LoggerFactory;
return isSetCatName();
case VALID_WRITE_ID_LIST:
return isSetValidWriteIdList();
+ case GET_COLUMN_STATS:
+ return isSetGetColumnStats();
}
throw new IllegalStateException();
}
@@ -451,6 +499,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_getColumnStats = true && this.isSetGetColumnStats();
+ boolean that_present_getColumnStats = true && that.isSetGetColumnStats();
+ if (this_present_getColumnStats || that_present_getColumnStats) {
+ if (!(this_present_getColumnStats && that_present_getColumnStats))
+ return false;
+ if (this.getColumnStats != that.getColumnStats)
+ return false;
+ }
+
return true;
}
@@ -483,6 +540,11 @@ import org.slf4j.LoggerFactory;
if (present_validWriteIdList)
list.add(validWriteIdList);
+ boolean present_getColumnStats = true && (isSetGetColumnStats());
+ list.add(present_getColumnStats);
+ if (present_getColumnStats)
+ list.add(getColumnStats);
+
return list.hashCode();
}
@@ -544,6 +606,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetGetColumnStats()).compareTo(other.isSetGetColumnStats());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGetColumnStats()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getColumnStats, other.getColumnStats);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -609,6 +681,12 @@ import org.slf4j.LoggerFactory;
}
first = false;
}
+ if (isSetGetColumnStats()) {
+ if (!first) sb.append(", ");
+ sb.append("getColumnStats:");
+ sb.append(this.getColumnStats);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -639,6 +717,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -704,6 +784,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 7: // GET_COLUMN_STATS
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.getColumnStats = iprot.readBool();
+ struct.setGetColumnStatsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -748,6 +836,11 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetGetColumnStats()) {
+ oprot.writeFieldBegin(GET_COLUMN_STATS_FIELD_DESC);
+ oprot.writeBool(struct.getColumnStats);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -777,7 +870,10 @@ import org.slf4j.LoggerFactory;
if (struct.isSetValidWriteIdList()) {
optionals.set(2);
}
- oprot.writeBitSet(optionals, 3);
+ if (struct.isSetGetColumnStats()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
if (struct.isSetCapabilities()) {
struct.capabilities.write(oprot);
}
@@ -787,6 +883,9 @@ import org.slf4j.LoggerFactory;
if (struct.isSetValidWriteIdList()) {
oprot.writeString(struct.validWriteIdList);
}
+ if (struct.isSetGetColumnStats()) {
+ oprot.writeBool(struct.getColumnStats);
+ }
}
@Override
@@ -796,7 +895,7 @@ import org.slf4j.LoggerFactory;
struct.setDbNameIsSet(true);
struct.tblName = iprot.readString();
struct.setTblNameIsSet(true);
- BitSet incoming = iprot.readBitSet(3);
+ BitSet incoming = iprot.readBitSet(4);
if (incoming.get(0)) {
struct.capabilities = new ClientCapabilities();
struct.capabilities.read(iprot);
@@ -810,6 +909,10 @@ import org.slf4j.LoggerFactory;
struct.validWriteIdList = iprot.readString();
struct.setValidWriteIdListIsSet(true);
}
+ if (incoming.get(3)) {
+ struct.getColumnStats = iprot.readBool();
+ struct.setGetColumnStatsIsSet(true);
+ }
}
}