You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/04/12 11:23:51 UTC
[hive] branch master updated: HIVE-21109: Support stats replication
for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ec6af1b HIVE-21109: Support stats replication for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
ec6af1b is described below
commit ec6af1bc775b8bb8807e6bcdb86afb05b11ac833
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Fri Apr 12 16:53:09 2019 +0530
HIVE-21109: Support stats replication for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../ql/parse/TestStatsReplicationScenarios.java | 405 ++++++++++++++++++---
...java => TestStatsReplicationScenariosACID.java} | 22 +-
...StatsReplicationScenariosACIDNoAutogather.java} | 20 +-
...r.java => TestStatsReplicationScenariosMM.java} | 20 +-
...stStatsReplicationScenariosMMNoAutogather.java} | 23 +-
.../TestStatsReplicationScenariosMigration.java | 73 ++++
...sReplicationScenariosMigrationNoAutogather.java | 73 ++++
.../TestStatsReplicationScenariosNoAutogather.java | 7 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 14 +-
.../hadoop/hive/ql/ddl/table/CreateTableDesc.java | 22 +-
.../hive/ql/ddl/table/CreateTableOperation.java | 17 +-
.../hadoop/hive/ql/exec/ColumnStatsUpdateTask.java | 52 ++-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 36 +-
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 9 +-
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 7 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 3 -
.../bootstrap/events/filesystem/FSTableEvent.java | 18 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 78 +++-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 106 ++++--
.../org/apache/hadoop/hive/ql/metadata/Table.java | 7 -
.../hive/ql/parse/ImportSemanticAnalyzer.java | 92 +++--
.../hive/ql/parse/repl/dump/TableExport.java | 11 +-
.../repl/dump/events/UpdatePartColStatHandler.java | 7 -
.../dump/events/UpdateTableColStatHandler.java | 5 -
.../repl/load/message/RenamePartitionHandler.java | 36 +-
.../repl/load/message/RenameTableHandler.java | 13 +-
.../load/message/UpdatePartColStatHandler.java | 13 +-
.../load/message/UpdateTableColStatHandler.java | 16 +-
.../hadoop/hive/ql/plan/AddPartitionDesc.java | 5 +
.../hadoop/hive/ql/plan/ColumnStatsUpdateWork.java | 10 +-
.../hadoop/hive/ql/plan/ImportTableDesc.java | 10 +-
.../hadoop/hive/ql/plan/RenamePartitionDesc.java | 2 +
.../hadoop/hive/metastore/HiveMetaStore.java | 71 +++-
.../apache/hadoop/hive/metastore/ObjectStore.java | 27 +-
.../hadoop/hive/metastore/cache/CachedStore.java | 2 +-
.../ptest2/conf/deployed/master-mr2.properties | 10 +-
36 files changed, 1018 insertions(+), 324 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
index 1ec4498..94eb1ff 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -20,10 +20,17 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.shims.Utils;
import org.junit.After;
import org.junit.AfterClass;
@@ -43,6 +50,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -60,35 +68,52 @@ public class TestStatsReplicationScenarios {
private static HiveConf conf;
private static boolean hasAutogather;
+ enum AcidTableKind {
+ FULL_ACID,
+ INSERT_ONLY
+ }
+
+ private static AcidTableKind acidTableKindToUse;
+
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, true);
+ internalBeforeClassSetup(overrides, overrides, TestReplicationScenarios.class, true, null);
}
- static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz,
- boolean autogather)
+ static void internalBeforeClassSetup(Map<String, String> primaryOverrides,
+ Map<String, String> replicaOverrides, Class clazz,
+ boolean autogather, AcidTableKind acidTableKind)
throws Exception {
conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- Map<String, String> localOverrides = new HashMap<String, String>() {{
+ Map<String, String> additionalOverrides = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
}};
- localOverrides.putAll(overrides);
- replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ Map<String, String> overrides = new HashMap<>();
+
+ overrides.putAll(additionalOverrides);
+ overrides.putAll(replicaOverrides);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, overrides);
// Run with autogather false on primary if requested
hasAutogather = autogather;
- localOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
- autogather ? "true" : "false");
- primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ additionalOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
+ autogather ? "true" : "false");
+ overrides.clear();
+ overrides.putAll(additionalOverrides);
+ overrides.putAll(primaryOverrides);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+
+ // Use transactional tables
+ acidTableKindToUse = acidTableKind;
}
@AfterClass
@@ -128,7 +153,8 @@ public class TestStatsReplicationScenarios {
private void verifyReplicatedStatsForTable(String tableName) throws Throwable {
// Test column stats
- Assert.assertEquals(primary.getTableColumnStatistics(primaryDbName, tableName),
+ Assert.assertEquals("Mismatching column statistics for table " + tableName,
+ primary.getTableColumnStatistics(primaryDbName, tableName),
replica.getTableColumnStatistics(replicatedDbName, tableName));
// Test table level stats
@@ -136,8 +162,9 @@ public class TestStatsReplicationScenarios {
collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
Map<String, String> pParams =
collectStatsParams(primary.getTable(primaryDbName, tableName).getParameters());
- Assert.assertEquals(pParams, rParams);
+ Assert.assertEquals("Mismatch in stats parameters for table " + tableName, pParams, rParams);
+ primary.getTable(primaryDbName, tableName).getPartitionKeys();
verifyReplicatedStatsForPartitionsOfTable(tableName);
}
@@ -151,18 +178,24 @@ public class TestStatsReplicationScenarios {
return;
}
+ List<FieldSchema> partKeys = primary.getTable(primaryDbName, tableName).getPartitionKeys();
for (Partition pPart : pParts) {
Partition rPart = replica.getPartition(replicatedDbName, tableName,
pPart.getValues());
Map<String, String> rParams = collectStatsParams(rPart.getParameters());
Map<String, String> pParams = collectStatsParams(pPart.getParameters());
- Assert.assertEquals(pParams, rParams);
+ String partName = Warehouse.makePartName(partKeys, pPart.getValues());
+ Assert.assertEquals("Mismatch in stats parameters for partition " + partName + " of table " + tableName,
+ pParams, rParams);
+
+ // Test partition column stats for the partition
+ Assert.assertEquals("Mismatching column statistics for partition " + partName + "of table " + tableName,
+ primary.getPartitionColumnStatistics(primaryDbName, tableName, partName,
+ StatsSetupConst.getColumnsHavingStats(pParams)),
+ replica.getPartitionColumnStatistics(replicatedDbName, tableName, partName,
+ StatsSetupConst.getColumnsHavingStats(rParams)));
}
-
- // Test partition column stats for all partitions
- Assert.assertEquals(primary.getAllPartitionColumnStatistics(primaryDbName, tableName),
- replica.getAllPartitionColumnStatistics(replicatedDbName, tableName));
}
private void verifyNoStatsReplicationForMetadataOnly(String tableName) throws Throwable {
@@ -170,17 +203,18 @@ public class TestStatsReplicationScenarios {
Assert.assertTrue(replica.getTableColumnStatistics(replicatedDbName, tableName).isEmpty());
// When no data is replicated, the basic stats parameters for table should look as if it's a
- // new table created on replica. Based on the create table rules the basic stats may be true
- // or false. Either is fine with us so don't bother checking exact values.
+ // new table created on replica i.e. zero or null.
Map<String, String> rParams =
collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
- Map<String, String> expectedFalseParams = new HashMap<>();
- Map<String, String> expectedTrueParams = new HashMap<>();
- StatsSetupConst.setStatsStateForCreateTable(expectedTrueParams,
- replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.TRUE);
- StatsSetupConst.setStatsStateForCreateTable(expectedFalseParams,
- replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.FALSE);
- Assert.assertTrue(rParams.equals(expectedFalseParams) || rParams.equals(expectedTrueParams));
+ for (String param : StatsSetupConst.SUPPORTED_STATS) {
+ String val = rParams.get(param);
+ Assert.assertTrue("parameter " + param + " of table " + tableName + " is expected to be " +
+ "null or 0", val == null || val.trim().equals("0"));
+ }
+
+ // As long as the above conditions are met, it doesn't matter whether basic and column stats
+ // state are set to true or false. If those are false, actual values are immaterial. If they
+ // are true, the values assured above represent the correct state of no data.
verifyNoPartitionStatsReplicationForMetadataOnly(tableName);
}
@@ -196,7 +230,8 @@ public class TestStatsReplicationScenarios {
// Partitions are not replicated in metadata only replication.
List<Partition> rParts = replica.getAllPartitions(replicatedDbName, tableName);
- Assert.assertTrue(rParts == null || rParts.isEmpty());
+ Assert.assertTrue("Partitions replicated in a metadata only dump",
+ rParts == null || rParts.isEmpty());
// Test partition column stats for all partitions
Map<String, List<ColumnStatisticsObj>> rPartColStats =
@@ -207,6 +242,18 @@ public class TestStatsReplicationScenarios {
}
}
+ private String getCreateTableProperties() {
+ if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+ return " stored as orc TBLPROPERTIES('transactional'='true')";
+ }
+
+ if (acidTableKindToUse == AcidTableKind.INSERT_ONLY) {
+ return " TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')";
+ }
+
+ return "";
+ }
+
private List<String> createBootStrapData() throws Throwable {
// Unpartitioned table with data
String simpleTableName = "sTable";
@@ -216,16 +263,19 @@ public class TestStatsReplicationScenarios {
String ndTableName = "ndTable";
// Partitioned table without data during bootstrap and hence no stats.
String ndPartTableName = "ndPTable";
+ String tblCreateExtra = getCreateTableProperties();
primary.run("use " + primaryDbName)
- .run("create table " + simpleTableName + " (id int)")
+ .run("create table " + simpleTableName + " (id int)" + tblCreateExtra)
.run("insert into " + simpleTableName + " values (1), (2)")
- .run("create table " + partTableName + " (place string) partitioned by (country string)")
+ .run("create table " + partTableName + " (place string) partitioned by (country string)"
+ + tblCreateExtra)
.run("insert into " + partTableName + " partition(country='india') values ('bangalore')")
.run("insert into " + partTableName + " partition(country='us') values ('austin')")
.run("insert into " + partTableName + " partition(country='france') values ('paris')")
- .run("create table " + ndTableName + " (str string)")
- .run("create table " + ndPartTableName + " (val string) partitioned by (pk int)");
+ .run("create table " + ndTableName + " (str string)" + tblCreateExtra)
+ .run("create table " + ndPartTableName + " (val string) partitioned by (pk int)" +
+ tblCreateExtra);
List<String> tableNames = new ArrayList<>(Arrays.asList(simpleTableName, partTableName,
ndTableName, ndPartTableName));
@@ -246,13 +296,14 @@ public class TestStatsReplicationScenarios {
* Dumps primarydb on primary, loads it on replica as replicadb, verifies that the statistics
* loaded are same as the ones on primary.
* @param tableNames, names of tables on primary expected to be loaded
- * @param lastReplicationId of the last dump, for incremental dump/load
* @param parallelLoad, if true, parallel bootstrap load is used
* @param metadataOnly, only metadata is dumped and loaded.
+ * @param lastReplicationId of the last dump, for incremental dump/load
+ * @param failRetry
* @return lastReplicationId of the dump performed.
*/
private String dumpLoadVerify(List<String> tableNames, String lastReplicationId,
- boolean parallelLoad, boolean metadataOnly)
+ boolean parallelLoad, boolean metadataOnly, boolean failRetry)
throws Throwable {
List<String> withClauseList;
// Parallel load works only for bootstrap.
@@ -269,15 +320,24 @@ public class TestStatsReplicationScenarios {
WarehouseInstance.Tuple dumpTuple = primary.run("use " + primaryDbName)
.dump(primaryDbName, lastReplicationId, withClauseList);
+
// Load, if necessary changing configuration.
if (parallelLoad) {
replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
}
- replica.load(replicatedDbName, dumpTuple.dumpLocation)
- .run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(tableNames.toArray(new String[1]));
+ // Fail load if for testing failure and retry scenario. Fail the load while setting
+ // checkpoint for a table in the middle of list of tables.
+ if (failRetry) {
+ if (lastReplicationId == null) {
+ failBootstrapLoad(dumpTuple, tableNames.size()/2);
+ } else {
+ failIncrementalLoad(dumpTuple);
+ }
+ }
+
+ // Load, possibly a retry
+ replica.load(replicatedDbName, dumpTuple.dumpLocation);
// Metadata load may not load all the events.
if (!metadataOnly) {
@@ -301,12 +361,108 @@ public class TestStatsReplicationScenarios {
return dumpTuple.lastReplicationId;
}
+ /**
+ * Run a bootstrap that will fail.
+ * @param tuple the location of bootstrap dump
+ */
+ private void failBootstrapLoad(WarehouseInstance.Tuple tuple, int failAfterNumTables) throws Throwable {
+ // fail setting ckpt directory property for the second table so that we test the case when
+ // bootstrap load fails after some but not all tables are loaded.
+ BehaviourInjection<CallerArguments, Boolean> callerVerifier
+ = new BehaviourInjection<CallerArguments, Boolean>() {
+ int cntTables = 0;
+ String prevTable = null;
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable CallerArguments args) {
+ if (prevTable == null ||
+ !prevTable.equalsIgnoreCase(args.tblName)) {
+ cntTables++;
+ }
+ prevTable = args.tblName;
+ if (args.dbName.equalsIgnoreCase(replicatedDbName) && cntTables > failAfterNumTables) {
+ injectionPathCalled = true;
+ LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
+ return false;
+ }
+ return true;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
+ try {
+ replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+ callerVerifier.assertInjectionsPerformed(true, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetAlterTableModifier();
+ }
+ }
+
+ private void failIncrementalLoad(WarehouseInstance.Tuple dumpTuple) throws Throwable {
+ // fail add notification when second update table stats event is encountered. Thus we
+ // test successful application as well as failed application of this event.
+ BehaviourInjection<NotificationEvent, Boolean> callerVerifier
+ = new BehaviourInjection<NotificationEvent, Boolean>() {
+ int cntEvents = 0;
+ @Override
+ public Boolean apply(NotificationEvent entry) {
+ cntEvents++;
+ if (entry.getEventType().equalsIgnoreCase(EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT.toString()) &&
+ cntEvents > 1) {
+ injectionPathCalled = true;
+ LOG.warn("Verifier - DB: " + entry.getDbName()
+ + " Table: " + entry.getTableName()
+ + " Event: " + entry.getEventType());
+ return false;
+ }
+ return true;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
+ try {
+ replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+ } finally {
+ InjectableBehaviourObjectStore.resetAddNotificationModifier();
+ }
+ callerVerifier.assertInjectionsPerformed(true, false);
+
+ // fail add notification when second update partition stats event is encountered. Thus we test
+ // successful application as well as failed application of this event.
+ callerVerifier = new BehaviourInjection<NotificationEvent, Boolean>() {
+ int cntEvents = 0;
+
+ @Override
+ public Boolean apply(NotificationEvent entry) {
+ cntEvents++;
+ if (entry.getEventType().equalsIgnoreCase(EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT.toString()) &&
+ cntEvents > 1) {
+ injectionPathCalled = true;
+ LOG.warn("Verifier - DB: " + entry.getDbName()
+ + " Table: " + entry.getTableName()
+ + " Event: " + entry.getEventType());
+ return false;
+ }
+ return true;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
+ try {
+ replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+ } finally {
+ InjectableBehaviourObjectStore.resetAddNotificationModifier();
+ }
+ callerVerifier.assertInjectionsPerformed(true, false);
+ }
+
private void createIncrementalData(List<String> tableNames) throws Throwable {
// Annotations for this table are same as createBootStrapData
String simpleTableName = "sTable";
String partTableName = "pTable";
String ndTableName = "ndTable";
String ndPartTableName = "ndPTable";
+ String tblCreateExtra = getCreateTableProperties();
Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
ndTableName, ndPartTableName)));
@@ -324,14 +480,15 @@ public class TestStatsReplicationScenarios {
.run("insert into " + partTableName + "(country, place) values ('us', 'chicago')")
// new partition
.run("insert into " + partTableName + "(country, place) values ('australia', 'perth')")
- .run("create table " + incTableName + " (config string, enabled boolean)")
+ .run("create table " + incTableName + " (config string, enabled boolean)" +
+ tblCreateExtra)
.run("insert into " + incTableName + " values ('conf1', true)")
.run("insert into " + incTableName + " values ('conf2', false)")
.run("insert into " + ndPartTableName + "(pk, val) values (1, 'one')")
.run("insert into " + ndPartTableName + "(pk, val) values (1, 'another one')")
.run("insert into " + ndPartTableName + "(pk, val) values (2, 'two')")
.run("create table " + incPartTableName +
- "(val string) partitioned by (tvalue boolean)")
+ "(val string) partitioned by (tvalue boolean)" + tblCreateExtra)
.run("insert into " + incPartTableName + "(tvalue, val) values (true, 'true')")
.run("insert into " + incPartTableName + "(tvalue, val) values (false, 'false')");
@@ -347,29 +504,183 @@ public class TestStatsReplicationScenarios {
}
}
- private void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly) throws Throwable {
+ private void applyDMLOperations(List<String> tableNames) throws Throwable {
+ // Annotations for this table are same as createBootStrapData
+ String simpleTableName = "sTable";
+ String partTableName = "pTable";
+ String ndTableName = "ndTable";
+ String ndPartTableName = "ndPTable";
+ String incTableName = "iTable"; // New table
+ String tblCreateExtra = getCreateTableProperties();
+
+ Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+ ndTableName, ndPartTableName, incTableName)));
+
+ String ctasTableName = "ctasTable"; // Table created through CTAS
+ String ctasPartTableName = "ctasPartTable"; // Table created through CTAS
+ // Tables created through import
+ String eximTableName = "eximTable";
+ String eximPartTableName = "eximPartTable";
+ // Tables created through load
+ String loadTableName = "loadTable";
+ String loadPartTableName = "loadPartTable";
+
+ String exportPath = "'hdfs:///tmp/" + primaryDbName + "/" + incTableName + "/'";
+ String exportPartPath = "'hdfs:///tmp/" + primaryDbName + "/" + partTableName + "/'";
+ String localDir = "./test.dat";
+ String inPath = localDir + "/000000_0";
+ String tableStorage = "";
+ if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+ tableStorage = "stored as orc";
+ }
+
+ primary.run("use " + primaryDbName)
+ // insert overwrite
+ .run("insert overwrite table " + simpleTableName + " values (5), (6), (7)")
+ .run("insert overwrite table " + partTableName + " partition (country='india') " +
+ " values ('bombay')")
+ // truncate
+ .run("truncate table " + ndTableName)
+ .run("truncate table " + ndPartTableName + " partition (pk=1)")
+ // CTAS
+ .run("create table " + ctasTableName + " as select * from " + incTableName)
+ .run("create table " + ctasPartTableName + " as select * from " + partTableName)
+ // Import
+ .run("export table " + partTableName + " to " + exportPartPath)
+ .run("import table " + eximPartTableName + " from " + exportPartPath)
+ .run("export table " + incTableName + " to " + exportPath)
+ .run("import table " + eximTableName + " from " + exportPath)
+ // load
+ .run("insert overwrite local directory '" + localDir + "'" + tableStorage + " select " +
+ "* from " + simpleTableName)
+ .run("create table " + loadTableName + " (id int)" + tblCreateExtra)
+ .run("load data local inpath '" + inPath + "' overwrite into table " + loadTableName)
+ .run("create table " + loadPartTableName + " (id int) partitioned by (key int) " + tblCreateExtra)
+ .run("load data local inpath '" + inPath + "' overwrite into table "
+ + loadPartTableName + " partition (key=1)");
+
+ tableNames.add(ctasTableName);
+ tableNames.add(ctasPartTableName);
+ tableNames.add(eximTableName);
+ tableNames.add(eximPartTableName);
+ tableNames.add(loadTableName);
+ tableNames.add(loadPartTableName);
+
+ // Run analyze on each of the tables, if they are not being gathered automatically.
+ if (!hasAutogather) {
+ for (String name : tableNames) {
+ primary.run("use " + primaryDbName)
+ .run("analyze table " + name + " compute statistics for columns");
+ }
+ }
+ }
+
+ private void applyTransactionalDMLOperations(List<String> tableNames) throws Throwable {
+ // Annotations for this table are same as createBootStrapData
+ String partTableName = "pTable";
+ String ndTableName = "ndTable";
+ String incTableName = "iTable";
+ String eximTableName = "eximTable";
+ String eximPartTableName = "eximPartTable";
+
+ Assert.assertTrue(tableNames.containsAll(Arrays.asList(partTableName, ndTableName,
+ eximPartTableName, eximTableName, incTableName)));
+
+ primary.run("update " + partTableName + " set place = 'mumbai' where place = 'bombay'")
+ .run("delete from " + partTableName + " where place = 'chicago'")
+ .run("merge into " + eximPartTableName + " as T using " + partTableName + " as U "
+ + " on T.country = U.country "
+ + " when matched and T.place != U.place then update set place = U.place"
+ + " when not matched then insert values (U.country, U.place)")
+ .run("update " + incTableName + " set enabled = false where config = 'conf1'")
+ .run("merge into " + eximTableName + " as T using " + incTableName + " as U "
+ + " on T.config = U.config"
+ + " when matched and T.enabled != U.enabled then update set enabled = U.enabled"
+ + " when not matched then insert values (U.config, U.enabled)")
+ .run("delete from " + ndTableName);
+
+ // Run analyze on each of the tables, if they are not being gathered automatically.
+ if (!hasAutogather) {
+ for (String name : tableNames) {
+ primary.run("use " + primaryDbName)
+ .run("analyze table " + name + " compute statistics for columns");
+ }
+ }
+ }
+
+ private void applyDDLOperations(List<String> tableNames) throws Throwable {
+ // Annotations for this table are same as createBootStrapData
+ String simpleTableName = "sTable";
+ String partTableName = "pTable";
+ String incTableName = "iTable";
+ String ctasTableName = "ctasTable"; // Table created through CTAS
+
+ Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+ incTableName, ctasTableName)));
+
+ String renamedTableName = "rnTable";
+
+ primary.run("use " + primaryDbName)
+ .run("alter table " + simpleTableName + " add columns (val int)")
+ .run("alter table " + incTableName + " change config configuration string")
+ .run("alter table " + ctasTableName + " rename to " + renamedTableName)
+ .run("alter table " + partTableName +
+ " partition(country='us') rename to partition (country='usa')");
+
+ tableNames.remove(ctasTableName);
+ tableNames.add(renamedTableName);
+ }
+
+ private void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly,
+ boolean failRetry) throws Throwable {
List<String> tableNames = createBootStrapData();
String lastReplicationId = dumpLoadVerify(tableNames, null, parallelBootstrap,
- metadataOnly);
+ metadataOnly, failRetry);
// Incremental dump
createIncrementalData(tableNames);
lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
- metadataOnly);
+ metadataOnly, failRetry);
+
+ // Incremental dump with Insert overwrite operation
+ applyDMLOperations(tableNames);
+ lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+ metadataOnly, false);
+
+ // Incremental dump with transactional DML operations
+ if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+ applyTransactionalDMLOperations(tableNames);
+ lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+ metadataOnly, false);
+ }
+
+ // Incremental dump with DDL operations
+ applyDDLOperations(tableNames);
+ lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+ metadataOnly, false);
+ }
+
+ @Test
+ public void testNonParallelBootstrapLoad() throws Throwable {
+ LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+ testStatsReplicationCommon(false, false, false);
}
@Test
- public void testForNonAcidTables() throws Throwable {
- testStatsReplicationCommon(false, false);
+ public void testForParallelBootstrapLoad() throws Throwable {
+ LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+ testStatsReplicationCommon(true, false, false);
}
@Test
- public void testForNonAcidTablesParallelBootstrapLoad() throws Throwable {
- testStatsReplicationCommon(true, false);
+ public void testMetadataOnlyDump() throws Throwable {
+ LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+ testStatsReplicationCommon(false, true, false);
}
@Test
- public void testNonAcidMetadataOnlyDump() throws Throwable {
- testStatsReplicationCommon(false, true);
+ public void testRetryFailure() throws Throwable {
+ LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+ testStatsReplicationCommon(false, false, true);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
similarity index 67%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
index 51f8dfb..ea42e0c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
@@ -23,31 +23,31 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
*/
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosACID extends TestStatsReplicationScenarios {
@Rule
public final TestName testName = new TestName();
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
-
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(), "false");
+ overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname, "1s");
+ overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosACID.class, true,
+ AcidTableKind.FULL_ACID);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
similarity index 67%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
index 51f8dfb..bf744af 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
@@ -30,24 +30,26 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
*/
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosACIDNoAutogather extends TestStatsReplicationScenarios {
@Rule
public final TestName testName = new TestName();
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
-
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(),"false");
+ overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname,"1s");
+ overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ internalBeforeClassSetup(overrides, overrides,
+ TestStatsReplicationScenariosACIDNoAutogather.class, false, AcidTableKind.FULL_ACID);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
similarity index 68%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
index 51f8dfb..4dc6558 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
@@ -30,24 +30,26 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
*/
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosMM extends TestStatsReplicationScenarios {
@Rule
public final TestName testName = new TestName();
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
-
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(),"false");
+ overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname,"1s");
+ overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosMM.class, true,
+ AcidTableKind.INSERT_ONLY);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
similarity index 65%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
index 51f8dfb..e1ee7b2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
@@ -23,31 +23,32 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
*/
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosMMNoAutogather extends TestStatsReplicationScenarios {
@Rule
public final TestName testName = new TestName();
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
-
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(), "false");
+ overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname, "1s");
+ overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+ overrides.put("mapred.input.dir.recursive", "true");
+
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ internalBeforeClassSetup(overrides, overrides,
+ TestStatsReplicationScenariosMMNoAutogather.class, false, AcidTableKind.INSERT_ONLY);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
new file mode 100644
index 0000000..49ad718
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication for ACID tables.
+ */
+public class TestStatsReplicationScenariosMigration extends TestStatsReplicationScenarios {
+ @Rule
+ public final TestName testName = new TestName();
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ Map<String, String> replicaConfigs = new HashMap<String, String>() {{
+ put("hive.support.concurrency", "true");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.strict.managed.tables", "true");
+ }};
+ replicaConfigs.putAll(overrides);
+
+ Map<String, String> primaryConfigs = new HashMap<String, String>() {{
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.support.concurrency", "false");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+ put("hive.strict.managed.tables", "false");
+ }};
+ primaryConfigs.putAll(overrides);
+
+ internalBeforeClassSetup(primaryConfigs, replicaConfigs,
+ TestStatsReplicationScenariosMigration.class, true, null);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
new file mode 100644
index 0000000..3b05220
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication for ACID tables.
+ */
+public class TestStatsReplicationScenariosMigrationNoAutogather extends TestStatsReplicationScenarios {
+ @Rule
+ public final TestName testName = new TestName();
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ Map<String, String> replicaConfigs = new HashMap<String, String>() {{
+ put("hive.support.concurrency", "true");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.strict.managed.tables", "true");
+ }};
+ replicaConfigs.putAll(overrides);
+
+ Map<String, String> primaryConfigs = new HashMap<String, String>() {{
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.support.concurrency", "false");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+ put("hive.strict.managed.tables", "false");
+ }};
+ primaryConfigs.putAll(overrides);
+
+ internalBeforeClassSetup(primaryConfigs, replicaConfigs,
+ TestStatsReplicationScenariosMigrationNoAutogather.class, false, null);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
index 51f8dfb..2d7e9c7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.junit.BeforeClass;
@@ -38,9 +37,6 @@ public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicat
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
@BeforeClass
public static void classLevelSetup() throws Exception {
@@ -48,6 +44,7 @@ public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicat
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+ internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosNoAutogather.class,
+ false, null);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index c76d30c..e9a63f8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -483,19 +483,18 @@ public class WarehouseInstance implements Closeable {
* Get statistics for given set of columns of a given table in the given database
* @param dbName - the database where the table resides
* @param tableName - tablename whose statistics are to be retrieved
- * @param colNames - columns whose statistics is to be retrieved.
* @return - list of ColumnStatisticsObj objects in the order of the specified columns
*/
public Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
String tableName) throws Exception {
List<String> colNames = new ArrayList();
client.getFields(dbName, tableName).forEach(fs -> colNames.add(fs.getName()));
- return getAllPartitionColumnStatistics(dbName, tableName, colNames);
+ return client.getPartitionColumnStatistics(dbName, tableName,
+ client.listPartitionNames(dbName, tableName, (short) -1), colNames);
}
/**
- * Get statistics for given set of columns for all the partitions of a given table in the given
- * database.
+ * Get statistics for a given partition of the given table in the given database.
* @param dbName - the database where the table resides
* @param tableName - name of the partitioned table in the database
* @param colNames - columns whose statistics is to be retrieved
@@ -503,12 +502,11 @@ public class WarehouseInstance implements Closeable {
* ordered according to the given list of columns.
* @throws Exception
*/
- Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
- String tableName,
- List<String> colNames)
+ List<ColumnStatisticsObj> getPartitionColumnStatistics(String dbName, String tableName,
+ String partName, List<String> colNames)
throws Exception {
return client.getPartitionColumnStatistics(dbName, tableName,
- client.listPartitionNames(dbName, tableName, (short) -1), colNames);
+ Collections.singletonList(partName), colNames).get(0);
}
public List<Partition> getAllPartitions(String dbName, String tableName) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
index d9e58e9..ee32f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -118,7 +117,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
List<SQLNotNullConstraint> notNullConstraints;
List<SQLDefaultConstraint> defaultConstraints;
List<SQLCheckConstraint> checkConstraints;
- private ColumnStatistics colStats;
+ private ColumnStatistics colStats; // For the sake of replication
private Long initialMmWriteId; // Initial MM write ID for CTAS and import.
// The FSOP configuration for the FSOP that is going to write initial data during ctas.
// This is not needed beyond compilation, so it is transient.
@@ -142,7 +141,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints,
- ColumnStatistics colStats) {
+ ColumnStatistics colStats, long writeId) {
this(tableName, isExternal, isTemporary, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
@@ -153,6 +152,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
this.databaseName = databaseName;
this.colStats = colStats;
+ this.replWriteId = writeId;
}
public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary,
@@ -174,7 +174,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
outputFormat, location, serName, storageHandler, serdeProps,
tblProps, ifNotExists, skewedColNames, skewedColValues,
primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints,
- null);
+ null, -1);
this.partColNames = partColNames;
this.isCTAS = isCTAS;
}
@@ -900,14 +900,16 @@ public class CreateTableDesc implements DDLDesc, Serializable {
colStatsDesc.setDbName(getTableName());
colStatsDesc.setDbName(getDatabaseName());
tbl.getTTable().setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+ // Statistics will have an associated write Id for a transactional table. We need it to
+ // update column statistics.
+ if (replWriteId > 0) {
+ tbl.getTTable().setWriteId(replWriteId);
+ }
}
- // The statistics for non-transactional tables will be obtained from the source. Do not
- // reset those on replica.
- if (replicationSpec != null && replicationSpec.isInReplicationScope() &&
- !TxnUtils.isTransactionalTable(tbl.getTTable())) {
- // Do nothing to the table statistics.
- } else {
+ // When replicating the statistics for a table will be obtained from the source. Do not
+ // reset it on replica.
+ if (replicationSpec == null || !replicationSpec.isInReplicationScope()) {
if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
index 24373fe..7da3d26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -93,27 +92,23 @@ public class CreateTableOperation extends DDLOperation {
private void createTableReplaceMode(Table tbl, boolean replDataLocationChanged) throws HiveException {
ReplicationSpec replicationSpec = desc.getReplicationSpec();
- long writeId = 0;
+ Long writeId = 0L;
EnvironmentContext environmentContext = null;
if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
if (replicationSpec.isMigratingToTxnTable()) {
// for migration we start the transaction and allocate write id in repl txn task for migration.
- String writeIdPara = context.getConf().get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
- if (writeIdPara == null) {
+ writeId = ReplUtils.getMigrationCurrentTblWriteId(context.getConf());
+ if (writeId == null) {
throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
}
- writeId = Long.parseLong(writeIdPara);
} else {
writeId = desc.getReplWriteId();
}
// In case of replication statistics is obtained from the source, so do not update those
- // on replica. Since we are not replicating statisics for transactional tables, do not do
- // so for transactional tables right now.
- if (!AcidUtils.isTransactionalTable(desc)) {
- environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
- }
+ // on replica.
+ environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
}
// In replication flow, if table's data location is changed, then set the corresponding flag in
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index cf00d7b..9756191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -33,7 +36,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.utils.DecimalUtils;
@@ -46,9 +48,11 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -297,21 +301,41 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
ColumnStatistics colStats = constructColumnStatsFromInput();
- ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
- // We do not support stats replication for a transactional table yet. If we are converting
- // a non-transactional table to a transactional table during replication, we might get
- // column statistics but we shouldn't update those.
- if (work.getColStats() != null &&
- AcidUtils.isTransactionalTable(getHive().getTable(colStatsDesc.getDbName(),
- colStatsDesc.getTableName()))) {
- LOG.debug("Skipped updating column stats for table " +
- TableName.getDbTable(colStatsDesc.getDbName(), colStatsDesc.getTableName()) +
- " because it is converted to a transactional table during replication.");
- return 0;
- }
-
SetPartitionsStatsRequest request =
new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+
+ // Set writeId and validWriteId list for replicated statistics. getColStats() will return
+ // non-null value only during replication.
+ if (work.getColStats() != null) {
+ String dbName = colStats.getStatsDesc().getDbName();
+ String tblName = colStats.getStatsDesc().getTableName();
+ Table tbl = db.getTable(dbName, tblName);
+ long writeId = work.getWriteId();
+ // If it's a transactional table on source and target, we will get a valid writeId
+ // associated with it. Otherwise it's a non-transactional table on source migrated to a
+ // transactional table on target, we need to craft a valid writeId here.
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ ValidWriteIdList writeIds;
+ if (work.getIsMigratingToTxn()) {
+ Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+ if (tmpWriteId == null) {
+ throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+ }
+ writeId = tmpWriteId;
+ }
+
+ // We need a valid writeId list to update column statistics for a transactional table. We
+ // do not have a valid writeId list which was used to update the column stats on the
+ // source. But we know for sure that the writeId associated with the stats was valid then
+ // (otherwise column stats update would have failed on the source). So use a valid
+ // transaction list with only that writeId and use it to update the stats.
+ writeIds = new ValidReaderWriteIdList(TableName.getDbTable(dbName, tblName), new long[0],
+ new BitSet(), writeId);
+ request.setValidWriteIdList(writeIds.toString());
+ request.setWriteId(writeId);
+ }
+ }
+
db.setPartitionColumnStatistics(request);
return 0;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index a1d795f..7c5a47e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1022,7 +1022,16 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
Partition part = db.getPartition(tbl, oldPartSpec, false);
part.setValues(renamePartitionDesc.getNewPartSpec());
- db.renamePartition(tbl, oldPartSpec, part);
+ long writeId = renamePartitionDesc.getWriteId();
+ if (renamePartitionDesc.getReplicationSpec() != null
+ && renamePartitionDesc.getReplicationSpec().isMigratingToTxnTable()) {
+ Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+ if (tmpWriteId == null) {
+ throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+ }
+ writeId = tmpWriteId;
+ }
+ db.renamePartition(tbl, oldPartSpec, part, writeId);
Partition newPart = db.getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false);
work.getInputs().add(new ReadEntity(oldPart));
// We've already obtained a lock on the table, don't lock the partition too
@@ -2460,11 +2469,32 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
environmentContext.putToProperties(HiveMetaHook.ALTER_TABLE_OPERATION_TYPE, alterTbl.getOp().name());
if (allPartitions == null) {
- db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), environmentContext, true);
+ long writeId = alterTbl.getWriteId() != null ? alterTbl.getWriteId() : 0;
+ if (alterTbl.getReplicationSpec() != null &&
+ alterTbl.getReplicationSpec().isMigratingToTxnTable()) {
+ Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+ if (tmpWriteId == null) {
+ throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+ }
+ writeId = tmpWriteId;
+ }
+ db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), environmentContext,
+ true, writeId);
} else {
// Note: this is necessary for UPDATE_STATISTICS command, that operates via ADDPROPS (why?).
// For any other updates, we don't want to do txn check on partitions when altering table.
- boolean isTxn = alterTbl.getPartSpec() != null && alterTbl.getOp() == AlterTableTypes.ADDPROPS;
+ boolean isTxn = false;
+ if (alterTbl.getPartSpec() != null && alterTbl.getOp() == AlterTableTypes.ADDPROPS) {
+ // ADDPROPS is used to add replication properties like repl.last.id, which isn't
+ // transactional change. In case of replication check for transactional properties
+ // explicitly.
+ Map<String, String> props = alterTbl.getProps();
+ if (alterTbl.getReplicationSpec() != null && alterTbl.getReplicationSpec().isInReplicationScope()) {
+ isTxn = (props.get(StatsSetupConst.COLUMN_STATS_ACCURATE) != null);
+ } else {
+ isTxn = true;
+ }
+ }
db.alterPartitions(Warehouse.getQualifiedName(tbl.getTTable()), allPartitions, environmentContext, isTxn);
}
// Add constraints if necessary
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 1c2522d..15a266d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -284,9 +284,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
// If we are loading a table during replication, the stats will also be replicated
- // and hence accurate if it's a non-transactional table. For transactional table we
- // do not replicate stats yet.
- return AcidUtils.isTransactionalTable(table.getParameters());
+ // and hence accurate. No need to reset those.
+ return false;
}
private final static class TaskInformation {
@@ -399,11 +398,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
// for transactional table if write id is not set during replication from a cluster with STRICT_MANAGED set
// to false then set it now.
if (tbd.getWriteId() <= 0 && AcidUtils.isTransactionalTable(table.getParameters())) {
- String writeId = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+ Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
if (writeId == null) {
throw new HiveException("MoveTask : Write id is not set in the config by open txn task for migration");
}
- tbd.setWriteId(Long.parseLong(writeId));
+ tbd.setWriteId(writeId);
tbd.setStmtId(driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index c34f075..dcf569f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -206,15 +206,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
// getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
// hence need to create base directory. If false, then it is repl load for regular insert into or
// load flow and hence just create delta directory.
- String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
- if (writeIdString == null) {
+ Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+ if (writeId == null) {
console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
return 6;
}
- long writeId = Long.parseLong(writeIdString);
// Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
// duplicate copy from the source. Check HIVE-21197 for more detail.
- int stmtId = (writeId == ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID) ?
+ int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement();
toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3704344..eb5c18a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -367,9 +367,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
// as bootstrap dump's last repl Id.
tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
-
- // For now we do not replicate stats for ACID table. So, wipe out column stats if any.
- tableSpec.tableHandle.getTTable().unsetColStats();
}
MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
new TableExport(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 22b6e98..27009f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -106,11 +107,10 @@ public class FSTableEvent implements TableEvent {
// If the conversion is from non transactional to transactional table
if (AcidUtils.isTransactionalTable(table)) {
replicationSpec().setMigratingToTxnTable();
- // There won't be any writeId associated with statistics on source non-transactional
- // table. We will need to associate a cooked up writeId on target for those. But that's
- // not done yet. Till then we don't replicate statistics for ACID table even if it's
- // available on the source.
- table.getTTable().unsetColStats();
+ // For migrated tables associate bootstrap writeId when replicating stats.
+ if (table.getTTable().isSetColStats()) {
+ table.getTTable().setWriteId(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID);
+ }
}
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
// since we have converted to an external table now after applying the migration rules the
@@ -196,15 +196,15 @@ public class FSTableEvent implements TableEvent {
}
partsDesc.setReplicationSpec(replicationSpec());
- // Right now, we do not have a way of associating a writeId with statistics for a table
- // converted to a transactional table if it was non-transactional on the source. So, do not
- // update statistics for converted tables even if available on the source.
- if (partition.isSetColStats() && !replicationSpec().isMigratingToTxnTable()) {
+ if (partition.isSetColStats()) {
ColumnStatistics colStats = partition.getColStats();
ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc());
colStatsDesc.setTableName(tblDesc.getTableName());
colStatsDesc.setDbName(tblDesc.getDatabaseName());
partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+ long writeId = replicationSpec().isMigratingToTxnTable() ?
+ ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId();
+ partDesc.setWriteId(writeId);
}
return partsDesc;
} catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 072189b..fbdbbdd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -161,6 +163,33 @@ public class ReplUtils {
return false;
}
+ public static boolean isTableMigratingToTransactional(HiveConf conf,
+ org.apache.hadoop.hive.metastore.api.Table tableObj)
+ throws TException, IOException {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) &&
+ !AcidUtils.isTransactionalTable(tableObj) &&
+ TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
+ //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata.
+ HiveStrictManagedMigration.TableMigrationOption migrationOption =
+ HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE,
+ null, conf, null, true);
+ return migrationOption == MANAGED;
+ }
+ return false;
+ }
+
+ private static void addOpenTxnTaskForMigration(String actualDbName, String actualTblName,
+ HiveConf conf,
+ UpdatedMetaDataTracker updatedMetaDataTracker,
+ List<Task<? extends Serializable>> taskList,
+ Task<? extends Serializable> childTask) {
+ Task<? extends Serializable> replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName,
+ ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf);
+ replTxnTask.addDependentTask(childTask);
+ updatedMetaDataTracker.setNeedCommitTxn(true);
+ taskList.add(replTxnTask);
+ }
+
public static List<Task<? extends Serializable>> addOpenTxnTaskForMigration(String actualDbName,
String actualTblName, HiveConf conf,
UpdatedMetaDataTracker updatedMetaDataTracker,
@@ -169,25 +198,36 @@ public class ReplUtils {
throws IOException, TException {
List<Task<? extends Serializable>> taskList = new ArrayList<>();
taskList.add(childTask);
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && updatedMetaDataTracker != null &&
- !AcidUtils.isTransactionalTable(tableObj) &&
- TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
- //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata.
- HiveStrictManagedMigration.TableMigrationOption migrationOption =
- HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE,
- null, conf, null, true);
- if (migrationOption == MANAGED) {
- //if conversion to managed table.
- Task<? extends Serializable> replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName,
- ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf);
- replTxnTask.addDependentTask(childTask);
- updatedMetaDataTracker.setNeedCommitTxn(true);
- taskList.add(replTxnTask);
- }
+ if (isTableMigratingToTransactional(conf, tableObj) && updatedMetaDataTracker != null) {
+ addOpenTxnTaskForMigration(actualDbName, actualTblName, conf, updatedMetaDataTracker,
+ taskList, childTask);
}
return taskList;
}
+ public static List<Task<? extends Serializable>> addTasksForLoadingColStats(ColumnStatistics colStats,
+ HiveConf conf,
+ UpdatedMetaDataTracker updatedMetadata,
+ org.apache.hadoop.hive.metastore.api.Table tableObj,
+ long writeId)
+ throws IOException, TException {
+ List<Task<? extends Serializable>> taskList = new ArrayList<>();
+ boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj);
+ ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats, isMigratingToTxn);
+ work.setWriteId(writeId);
+ Task<?> task = TaskFactory.get(work, conf);
+ taskList.add(task);
+ // If the table is going to be migrated to a transactional table we will need to open
+ // and commit a transaction to associate a valid writeId with the statistics.
+ if (isMigratingToTxn) {
+ ReplUtils.addOpenTxnTaskForMigration(colStats.getStatsDesc().getDbName(),
+ colStats.getStatsDesc().getTableName(), conf, updatedMetadata, taskList,
+ task);
+ }
+
+ return taskList;
+
+ }
// Path filters to filter only events (directories) excluding "_bootstrap"
public static PathFilter getEventsDirectoryFilter(final FileSystem fs) {
return p -> {
@@ -216,4 +256,12 @@ public class ReplUtils {
envContext.putToProperties(ReplConst.REPL_DATA_LOCATION_CHANGED, ReplConst.TRUE);
return envContext;
}
+
+ public static Long getMigrationCurrentTblWriteId(HiveConf conf) {
+ String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+ if (writeIdString == null) {
+ return null;
+ }
+ return Long.parseLong(writeIdString);
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4350dc8..fdd3e46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -42,6 +42,7 @@ import java.nio.ByteBuffer;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -665,6 +666,14 @@ public class Hive {
alterTable(null, names[0], names[1], newTbl, cascade, environmentContext, transactional);
}
+ public void alterTable(String fullyQlfdTblName, Table newTbl, boolean cascade,
+ EnvironmentContext environmentContext, boolean transactional, long writeId)
+ throws HiveException {
+ String[] names = Utilities.getDbTableName(fullyQlfdTblName);
+ alterTable(null, names[0], names[1], newTbl, cascade, environmentContext, transactional,
+ writeId);
+ }
+
public void alterTable(String catName, String dbName, String tblName, Table newTbl, boolean cascade,
EnvironmentContext environmentContext, boolean transactional) throws HiveException {
alterTable(catName, dbName, tblName, newTbl, cascade, environmentContext, transactional, 0);
@@ -694,7 +703,14 @@ public class Hive {
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
if (replWriteId > 0) {
- ValidWriteIdList writeIds = AcidUtils.getTableValidWriteIdListWithTxnList(conf, dbName, tblName);
+ // We need a valid writeId list for a transactional table modification. During
+ // replication we do not have a valid writeId list which was used to modify the table
+ // on the source. But we know for sure that the writeId associated with it was valid
+ // then (otherwise modification would have failed on the source). So use a valid
+ // transaction list with only that writeId.
+ ValidWriteIdList writeIds = new ValidReaderWriteIdList(TableName.getDbTable(dbName, tblName),
+ new long[0], new BitSet(),
+ replWriteId);
tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
} else {
// Make sure we pass in the names, so we can get the correct snapshot for rename table.
@@ -863,7 +879,8 @@ public class Hive {
* new partition
* @throws HiveException
*/
- public void renamePartition(Table tbl, Map<String, String> oldPartSpec, Partition newPart)
+ public void renamePartition(Table tbl, Map<String, String> oldPartSpec, Partition newPart,
+ long replWriteId)
throws HiveException {
try {
Map<String, String> newPartSpec = newPart.getSpec();
@@ -887,8 +904,21 @@ public class Hive {
}
String validWriteIds = null;
if (AcidUtils.isTransactionalTable(tbl)) {
- // Set table snapshot to api.Table to make it persistent.
- TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+ TableSnapshot tableSnapshot;
+ if (replWriteId > 0) {
+ // We need a valid writeId list for a transactional table modification. During
+ // replication we do not have a valid writeId list which was used to modify the table
+ // on the source. But we know for sure that the writeId associated with it was valid
+ // then (otherwise modification would have failed on the source). So use a valid
+ // transaction list with only that writeId.
+ ValidWriteIdList writeIds = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+ tbl.getTableName()), new long[0], new BitSet(), replWriteId);
+ tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
+ } else {
+ // Set table snapshot to api.Table to make it persistent.
+ tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+ }
+
if (tableSnapshot != null) {
newPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
validWriteIds = tableSnapshot.getValidWriteIdList();
@@ -987,10 +1017,14 @@ public class Hive {
tTbl.setPrivileges(principalPrivs);
}
}
- // Set table snapshot to api.Table to make it persistent.
- TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
- if (tableSnapshot != null) {
- tbl.getTTable().setWriteId(tableSnapshot.getWriteId());
+ // Set table snapshot to api.Table to make it persistent. A transactional table being
+ // replicated may have a valid write Id copied from the source. Use that instead of
+ // crafting one on the replica.
+ if (tTbl.getWriteId() <= 0) {
+ TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+ if (tableSnapshot != null) {
+ tbl.getTTable().setWriteId(tableSnapshot.getWriteId());
+ }
}
if (primaryKeys == null && foreignKeys == null
@@ -2950,21 +2984,36 @@ private void constructOneLBLocationMap(FileStatus fSta,
int size = addPartitionDesc.getPartitionCount();
List<org.apache.hadoop.hive.metastore.api.Partition> in =
new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>(size);
- AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
long writeId;
String validWriteIdList;
- if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
- writeId = tableSnapshot.getWriteId();
- validWriteIdList = tableSnapshot.getValidWriteIdList();
+
+ // In case of replication, get the writeId from the source and use valid write Id list
+ // for replication.
+ if (addPartitionDesc.getReplicationSpec().isInReplicationScope() &&
+ addPartitionDesc.getPartition(0).getWriteId() > 0) {
+ writeId = addPartitionDesc.getPartition(0).getWriteId();
+ // We need a valid writeId list for a transactional change. During replication we do not
+ // have a valid writeId list which was used for this on the source. But we know for sure
+ // that the writeId associated with it was valid then (otherwise the change would have
+ // failed on the source). So use a valid transaction list with only that writeId.
+ validWriteIdList = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+ tbl.getTableName()),
+ new long[0], new BitSet(), writeId).writeToString();
} else {
- writeId = -1;
- validWriteIdList = null;
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+ if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
+ writeId = tableSnapshot.getWriteId();
+ validWriteIdList = tableSnapshot.getValidWriteIdList();
+ } else {
+ writeId = -1;
+ validWriteIdList = null;
+ }
}
for (int i = 0; i < size; ++i) {
org.apache.hadoop.hive.metastore.api.Partition tmpPart =
convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf);
- if (tmpPart != null && tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
- tmpPart.setWriteId(tableSnapshot.getWriteId());
+ if (tmpPart != null && writeId > 0) {
+ tmpPart.setWriteId(writeId);
}
in.add(tmpPart);
}
@@ -3004,12 +3053,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
out.add(new Partition(tbl, outPart));
}
EnvironmentContext ec = new EnvironmentContext();
- // In case of replication statistics is obtained from the source, so do not update those
- // on replica. Since we are not replicating statistics for transactional tables, do not do
- // so for a partition of a transactional table right now.
- if (!AcidUtils.isTransactionalTable(tbl)) {
- ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
- }
+ // In case of replication, statistics is obtained from the source, so do not update those
+ // on replica.
+ ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),
partsToAlter, ec, validWriteIdList, writeId);
@@ -3064,6 +3110,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
if (addSpec.getColStats() != null) {
part.setColStats(addSpec.getColStats());
+ // Statistics will have an associated write Id for a transactional table. We need it to
+ // update column statistics.
+ part.setWriteId(addSpec.getWriteId());
}
return part;
}
@@ -5068,11 +5117,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
try {
ColumnStatistics colStat = request.getColStats().get(0);
ColumnStatisticsDesc statsDesc = colStat.getStatsDesc();
- Table tbl = getTable(statsDesc.getDbName(), statsDesc.getTableName());
- AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
- request.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
- request.setWriteId(tableSnapshot != null ? tableSnapshot.getWriteId() : 0);
+ // In case of replication, the request already has valid writeId and valid transaction id
+ // list obtained from the source. Just use it.
+ if (request.getWriteId() <= 0 || request.getValidWriteIdList() == null) {
+ Table tbl = getTable(statsDesc.getDbName(), statsDesc.getTableName());
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+ request.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
+ request.setWriteId(tableSnapshot != null ? tableSnapshot.getWriteId() : 0);
+ }
+
return getMSC().setPartitionColumnStatistics(request);
} catch (Exception e) {
LOG.debug(StringUtils.stringifyException(e));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index fb1c8d4..ae030d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -1119,12 +1118,6 @@ public class Table implements Serializable {
* table or during replication.
*/
public void setStatsStateLikeNewTable() {
- // We do not replicate statistics for
- // an ACID Table right now, so don't touch them right now.
- if (AcidUtils.isTransactionalTable(this)) {
- return;
- }
-
if (isPartitioned()) {
StatsSetupConst.setStatsStateForCreateTable(getParameters(), null,
StatsSetupConst.FALSE);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cb9584c..07b40c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -288,11 +287,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
//if the conversion is from non transactional to transactional table
if (TxnUtils.isTransactionalTable(tblObj)) {
replicationSpec.setMigratingToTxnTable();
- // There won't be any writeId associated with statistics on source non-transactional
- // table. We will need to associate a cooked up writeId on target for those. But that's
- // not done yet. Till then we don't replicate statistics for ACID table even if it's
- // available on the source.
- tblObj.unsetColStats();
}
tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
@@ -311,11 +305,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
boolean inReplicationScope = false;
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()) {
tblDesc.setReplicationSpec(replicationSpec);
- // Statistics for a non-transactional table will be replicated separately. Don't bother
- // with it here.
- if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
- StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
- }
inReplicationScope = true;
tblDesc.setReplWriteId(writeId);
tblDesc.setOwnerName(tblObj.getOwner());
@@ -345,13 +334,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
AddPartitionDesc partsDesc =
getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
replicationSpec, x.getConf());
- if (inReplicationScope) {
- // Statistics for a non-transactional table will be replicated separately. Don't bother
- // with it here.
- if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
- StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
- }
- }
partitionDescs.add(partsDesc);
}
@@ -455,6 +437,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partDesc.setLocation(new Path(fromPath,
Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
}
+ if (tblDesc.getReplWriteId() != null) {
+ partDesc.setWriteId(tblDesc.getReplWriteId());
+ }
return partsDesc;
}
@@ -667,8 +652,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}
- Task<?> addPartTask = TaskFactory.get(
- new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
+ Task<?> addPartTask = null;
+ if (x.getEventType() != DumpType.EVENT_COMMIT_TXN) {
+ // During replication, by the time we are applying commit transaction event, we expect
+ // the partition/s to be already added or altered by previous events. So no need to
+ // create add partition event again.
+ addPartTask = TaskFactory.get(
+ new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
+ }
MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
null, null, false);
@@ -704,15 +695,20 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (x.getEventType() == DumpType.EVENT_INSERT) {
copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf()));
} else {
- copyTask.addDependentTask(addPartTask);
+ if (addPartTask != null) {
+ copyTask.addDependentTask(addPartTask);
+ }
}
return copyTask;
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadPartTask);
- addPartTask.addDependentTask(loadPartTask);
- x.getTasks().add(copyTask);
- return addPartTask;
+ if (addPartTask != null) {
+ addPartTask.addDependentTask(loadPartTask);
+ x.getTasks().add(copyTask);
+ return addPartTask;
+ }
+ return copyTask;
}
}
@@ -1225,19 +1221,19 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
lockType = WriteEntity.WriteType.DDL_SHARED;
}
- Task t = createTableTask(tblDesc, x);
table = createNewTableMetadataObject(tblDesc, true);
+ List<Task<?>> dependentTasks = null;
if (isPartitioned(tblDesc)) {
+ dependentTasks = new ArrayList<>(partitionDescs.size());
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
if (!replicationSpec.isMetadataOnly()) {
- t.addDependentTask(
- addSinglePartition(tblDesc, table, wh, addPartitionDesc, replicationSpec, x,
- writeId, stmtId));
+ dependentTasks.add(addSinglePartition(tblDesc, table, wh, addPartitionDesc,
+ replicationSpec, x, writeId, stmtId));
} else {
- t.addDependentTask(alterSinglePartition(tblDesc, table, wh, addPartitionDesc,
- replicationSpec, null, x));
+ dependentTasks.add(alterSinglePartition(tblDesc, table, wh, addPartitionDesc,
+ replicationSpec, null, x));
}
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
@@ -1247,17 +1243,37 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else if (!replicationSpec.isMetadataOnly()
&& !shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)) {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
- t.addDependentTask(loadTable(fromURI, table, replicationSpec.isReplace(),
- new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));
+ dependentTasks = new ArrayList<>(1);
+ dependentTasks.add(loadTable(fromURI, table, replicationSpec.isReplace(),
+ new Path(tblDesc.getLocation()), replicationSpec,
+ x, writeId, stmtId));
}
- if (dropTblTask != null) {
- // Drop first and then create
- dropTblTask.addDependentTask(t);
- x.getTasks().add(dropTblTask);
+ // During replication, by the time we replay a commit transaction event, the table should
+ // have been already created when replaying previous events. So no need to create table
+ // again.
+ if (x.getEventType() != DumpType.EVENT_COMMIT_TXN) {
+ Task t = createTableTask(tblDesc, x);
+ if (dependentTasks != null) {
+ dependentTasks.forEach(task -> t.addDependentTask(task));
+ }
+ if (dropTblTask != null) {
+ // Drop first and then create
+ dropTblTask.addDependentTask(t);
+ x.getTasks().add(dropTblTask);
+ } else {
+ // Simply create
+ x.getTasks().add(t);
+ }
} else {
- // Simply create
- x.getTasks().add(t);
+ // We should not require to create a drop table task when replaying a commit transaction
+ // event. That should have been done when replaying create table event itself.
+ assert dropTblTask == null;
+
+ // Add all the tasks created above directly
+ if (dependentTasks != null) {
+ x.getTasks().addAll(dependentTasks);
+ }
}
} else {
// If table of current event has partition flag different from existing table, it means, some
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 4cd4d70..c2e26f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -109,16 +108,8 @@ public class TableExport {
if (replicationSpec.isMetadataOnly()) {
return null;
} else {
- // For transactional tables, we do not replicate statistics right now, so don't
- // include statistics in Partition object as well.
- boolean getColStats;
- if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
- getColStats = false;
- } else {
- getColStats = true;
- }
return new PartitionIterable(db, tableSpec.tableHandle, null, conf.getIntVar(
- HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), getColStats);
+ HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), true);
}
} else {
// PARTITIONS specified - partitions inside tableSpec
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
index 79e1361..54fc7a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -53,12 +52,6 @@ class UpdatePartColStatHandler extends AbstractEventHandler<UpdatePartitionColum
return;
}
- // For now we do not dump statistics for a transactional table since replicating the same is
- // not supported.
- if (AcidUtils.isTransactionalTable(tableObj)) {
- return;
- }
-
if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true,
withinContext.hiveConf)) {
return;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index ca9af5e..62db959 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -46,10 +45,6 @@ class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnSt
if (withinContext.replicationSpec.isMetadataOnly()) {
return;
}
- // For now we do not replicate the statistics for transactional tables.
- if (AcidUtils.isTransactionalTable(qlMdTable)) {
- return;
- }
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(eventMessageAsJSON);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
index 1125f69..9c66210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -18,15 +18,17 @@
package org.apache.hadoop.hive.ql.parse.repl.load.message;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import java.io.Serializable;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -44,26 +46,34 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
Map<String, String> newPartSpec = new LinkedHashMap<>();
Map<String, String> oldPartSpec = new LinkedHashMap<>();
String tableName = actualDbName + "." + actualTblName;
+ Table tableObj;
+ ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec();
try {
Iterator<String> beforeIterator = msg.getPtnObjBefore().getValuesIterator();
Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
- for (FieldSchema fs : msg.getTableObj().getPartitionKeys()) {
+ tableObj = msg.getTableObj();
+ for (FieldSchema fs : tableObj.getPartitionKeys()) {
oldPartSpec.put(fs.getName(), beforeIterator.next());
newPartSpec.put(fs.getName(), afterIterator.next());
}
+ if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj)) {
+ replicationSpec.setMigratingToTxnTable();
+ }
+
+ RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(
+ tableName, oldPartSpec, newPartSpec, replicationSpec, null);
+ renamePtnDesc.setWriteId(msg.getWriteId());
+ Task<DDLWork> renamePtnTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
+ context.log.debug("Added rename ptn task : {}:{}->{}",
+ renamePtnTask.getId(), oldPartSpec, newPartSpec);
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
+ return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName,
+ context.hiveConf, updatedMetadata, renamePtnTask, tableObj);
} catch (Exception e) {
throw (e instanceof SemanticException)
- ? (SemanticException) e
- : new SemanticException("Error reading message members", e);
+ ? (SemanticException) e
+ : new SemanticException("Error reading message members", e);
}
-
- RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(
- tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec(), null);
- Task<DDLWork> renamePtnTask = TaskFactory.get(
- new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
- context.log.debug("Added rename ptn task : {}:{}->{}",
- renamePtnTask.getId(), oldPartSpec, newPartSpec);
- updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
- return Collections.singletonList(renamePtnTask);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
index ddf2ca1..53d9982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -21,13 +21,14 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
public class RenameTableHandler extends AbstractMessageHandler {
@@ -61,8 +62,13 @@ public class RenameTableHandler extends AbstractMessageHandler {
String oldName = StatsUtils.getFullyQualifiedTableName(oldDbName, tableObjBefore.getTableName());
String newName = StatsUtils.getFullyQualifiedTableName(newDbName, tableObjAfter.getTableName());
+ ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec();
+ if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter)) {
+ replicationSpec.setMigratingToTxnTable();
+ }
AlterTableDesc renameTableDesc = new AlterTableDesc(
- oldName, newName, false, context.eventOnlyReplicationSpec());
+ oldName, newName, false, replicationSpec);
+ renameTableDesc.setWriteId(msg.getWriteId());
Task<DDLWork> renameTableTask = TaskFactory.get(
new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf);
context.log.debug("Added rename table task : {}:{}->{}",
@@ -75,7 +81,8 @@ public class RenameTableHandler extends AbstractMessageHandler {
// Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out
// tablesUpdated. However, we explicitly don't support repl of that sort, and error out above
// if so. If that should ever change, this will need reworking.
- return Collections.singletonList(renameTableTask);
+ return ReplUtils.addOpenTxnTaskForMigration(oldDbName, tableObjBefore.getTableName(),
+ context.hiveConf, updatedMetadata, renameTableTask, tableObjAfter);
} catch (Exception e) {
throw (e instanceof SemanticException)
? (SemanticException) e
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
index 02e938e..cb85f7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
/**
@@ -51,7 +49,12 @@ public class UpdatePartColStatHandler extends AbstractMessageHandler {
updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName,
null);
}
- return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
- context.hiveConf));
+
+ try {
+ return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata,
+ upcsm.getTableObject(), upcsm.getWriteId());
+ } catch(Exception e) {
+ throw new SemanticException(e);
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
index 9a60de4..371429e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
/**
@@ -52,11 +50,11 @@ public class UpdateTableColStatHandler extends AbstractMessageHandler {
context.tableName, null);
}
- // TODO: For txn stats update, ColumnStatsUpdateTask.execute()->Hive
- // .setPartitionColumnStatistics expects a valid writeId allocated by the current txn and
- // also, there should be a table snapshot. But, it won't be there as update from
- // ReplLoadTask which doesn't have a write id allocated. Need to check this further.
- return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
- context.hiveConf));
+ try {
+ return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata,
+ utcsm.getTableObject(), utcsm.getWriteId());
+ } catch(Exception e) {
+ throw new SemanticException(e);
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
index 26cb217..8ea857e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
@@ -60,6 +60,7 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
List<String> bucketCols = null;
List<Order> sortCols = null;
ColumnStatistics colStats = null;
+ long writeId = -1;
public Map<String, String> getPartSpec() {
return partSpec;
@@ -151,6 +152,10 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
public ColumnStatistics getColStats() { return colStats; }
public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; }
+
+ public long getWriteId() { return writeId; }
+
+ public void setWriteId(long writeId) { this.writeId = writeId; }
}
private static final long serialVersionUID = 1L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index 1219b62..4a14246 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -44,6 +44,8 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
private final String colName;
private final String colType;
private final ColumnStatistics colStats;
+ private final boolean isMigratingToTxn; // Is the table for which we are updating stats going
+ // to be migrated during replication.
private long writeId;
public ColumnStatsUpdateWork(String partName,
@@ -59,10 +61,12 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
this.colName = colName;
this.colType = colType;
this.colStats = null;
+ this.isMigratingToTxn = false;
}
- public ColumnStatsUpdateWork(ColumnStatistics colStats) {
+ public ColumnStatsUpdateWork(ColumnStatistics colStats, boolean isMigratingToTxn) {
this.colStats = colStats;
+ this.isMigratingToTxn = isMigratingToTxn;
this.partName = null;
this.mapProp = null;
this.dbName = null;
@@ -102,11 +106,15 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
public ColumnStatistics getColStats() { return colStats; }
+ public boolean getIsMigratingToTxn() { return isMigratingToTxn; }
+
@Override
public void setWriteId(long writeId) {
this.writeId = writeId;
}
+ public long getWriteId() { return writeId; }
+
@Override
public String getFullTableName() {
return dbName + "." + tableName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 8db5d33..dd3af1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -88,7 +88,8 @@ public class ImportTableDesc {
null,
null,
null,
- table.getColStats());
+ table.getColStats(),
+ table.getTTable().getWriteId());
this.createTblDesc.setStoredAsSubDirectories(table.getSd().isStoredAsSubDirectories());
break;
case VIEW:
@@ -382,4 +383,11 @@ public class ImportTableDesc {
throw new RuntimeException("Invalid table type : " + getDescType());
}
}
+
+ public Long getReplWriteId() {
+ if (this.createTblDesc != null) {
+ return this.createTblDesc.getReplWriteId();
+ }
+ return -1L;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
index a4a31a5..b4edbfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
@@ -127,6 +127,8 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable, DDLDes
this.writeId = writeId;
}
+ public long getWriteId() { return writeId; }
+
@Override
public String getFullTableName() {
return fqTableName;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 854c85f..f1983c5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -78,6 +79,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.metastore.api.*;
@@ -1894,6 +1897,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<SQLCheckConstraint> checkConstraints)
throws AlreadyExistsException, MetaException,
InvalidObjectException, NoSuchObjectException, InvalidInputException {
+
+ ColumnStatistics colStats = null;
+ // If the given table has column statistics, save it here. We will update it later.
+ // We don't want it to be part of the Table object being created, lest the create table
+ // event will also have the col stats which we don't want.
+ if (tbl.isSetColStats()) {
+ colStats = tbl.getColStats();
+ tbl.unsetColStats();
+ }
+
// To preserve backward compatibility throw MetaException in case of null database
if (tbl.getDbName() == null) {
throw new MetaException("Null database name is not allowed");
@@ -2128,13 +2141,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- // If the table has column statistics, update it into the metastore. This feature is used
- // by replication to replicate table level statistics.
- if (tbl.isSetColStats()) {
- // We do not replicate statistics for a transactional table right now and hence we do not
- // expect a transactional table to have column statistics here. So passing null
- // validWriteIds is fine for now.
- updateTableColumnStatsInternal(tbl.getColStats(), null, tbl.getWriteId());
+ // If the table has column statistics, update it into the metastore. We need a valid
+ // writeId list to update column statistics for a transactional table. But during bootstrap
+ // replication, where we use this feature, we do not have a valid writeId list which was
+ // used to update the stats. But we know for sure that the writeId associated with the
+ // stats was valid then (otherwise stats update would have failed on the source). So, craft
+ // a valid transaction list with only that writeId and use it to update the stats.
+ if (colStats != null) {
+ long writeId = tbl.getWriteId();
+ String validWriteIds = null;
+ if (writeId > 0) {
+ ValidWriteIdList validWriteIdList =
+ new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+ tbl.getTableName()),
+ new long[0], new BitSet(), writeId);
+ validWriteIds = validWriteIdList.toString();
+ }
+ updateTableColumnStatsInternal(colStats, validWriteIds, tbl.getWriteId());
}
}
@@ -3476,6 +3499,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Map<String, String> transactionalListenerResponses = Collections.emptyMap();
Database db = null;
+ List<ColumnStatistics> partsColStats = new ArrayList<>(parts.size());
+ List<Long> partsWriteIds = new ArrayList<>(parts.size());
+
try {
ms.openTransaction();
tbl = ms.getTable(catName, dbName, tblName, null);
@@ -3495,6 +3521,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Partition> partitionsToAdd = new ArrayList<>(parts.size());
List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
for (final Partition part : parts) {
+ // Collect partition column stats to be updated if present. Partition objects passed down
+ // here at the time of replication may have statistics in them, which is required to be
+ // updated in the metadata. But we don't want it to be part of the Partition object when
+ // it's being created or altered, lest it becomes part of the notification event.
+ if (part.isSetColStats()) {
+ partsColStats.add(part.getColStats());
+ part.unsetColStats();
+ partsWriteIds.add(part.getWriteId());
+ }
+
// Iterate through the partitions and validate them. If one of the partitions is
// incorrect, an exception will be thrown before the threads which create the partition
// folders are submitted. This way we can be sure that no partition and no partition
@@ -3538,11 +3574,24 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- // Update partition column statistics if available
- for (Partition newPart : newParts) {
- if (newPart.isSetColStats()) {
- updatePartitonColStatsInternal(tbl, newPart.getColStats(), null, newPart.getWriteId());
+ // Update partition column statistics if available. We need a valid writeId list to
+ // update column statistics for a transactional table. But during bootstrap replication,
+ // where we use this feature, we do not have a valid writeId list which was used to
+ // update the stats. But we know for sure that the writeId associated with the stats was
+ // valid then (otherwise stats update would have failed on the source). So, craft a valid
+ // transaction list with only that writeId and use it to update the stats.
+ int cnt = 0;
+ for (ColumnStatistics partColStats: partsColStats) {
+ long writeId = partsWriteIds.get(cnt++);
+ String validWriteIds = null;
+ if (writeId > 0) {
+ ValidWriteIdList validWriteIdList =
+ new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+ tbl.getTableName()),
+ new long[0], new BitSet(), writeId);
+ validWriteIds = validWriteIdList.toString();
}
+ updatePartitonColStatsInternal(tbl, partColStats, validWriteIds, writeId);
}
success = ms.commitTransaction();
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index f9a4e48..391915a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4180,8 +4180,8 @@ public class ObjectStore implements RawStore, Configurable {
boolean isToTxn = isTxn && !TxnUtils.isTransactionalTable(oldt.getParameters());
if (!isToTxn && isTxn && areTxnStatsSupported) {
// Transactional table is altered without a txn. Make sure there are no changes to the flag.
- String errorMsg = verifyStatsChangeCtx(oldt.getParameters(), newTable.getParameters(),
- newTable.getWriteId(), queryValidWriteIds, false);
+ String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(name, dbname), oldt.getParameters(),
+ newTable.getParameters(), newTable.getWriteId(), queryValidWriteIds, false);
if (errorMsg != null) {
throw new MetaException(errorMsg);
}
@@ -4238,8 +4238,8 @@ public class ObjectStore implements RawStore, Configurable {
* Verifies that the stats JSON string is unchanged for alter table (txn stats).
* @return Error message with the details of the change, or null if the value has not changed.
*/
- public static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
- long writeId, String validWriteIds, boolean isColStatsChange) {
+ public static String verifyStatsChangeCtx(String fullTableName, Map<String, String> oldP, Map<String, String> newP,
+ long writeId, String validWriteIds, boolean isColStatsChange) {
if (validWriteIds != null && writeId > 0) return null; // We have txn context.
String oldVal = oldP == null ? null : oldP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
String newVal = newP == null ? null : newP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -4255,9 +4255,10 @@ public class ObjectStore implements RawStore, Configurable {
// Some change to the stats state is being made; it can only be made with a write ID.
// Note - we could do this: if (writeId > 0 && (validWriteIds != null || !StatsSetupConst.areBasicStatsUptoDate(newP))) { return null;
// However the only way ID list can be absent is if WriteEntity wasn't generated for the alter, which is a separate bug.
- return "Cannot change stats state for a transactional table without providing the transactional"
- + " write state for verification (new write ID " + writeId + ", valid write IDs "
- + validWriteIds + "; current state " + oldVal + "; new state " + newVal;
+ return "Cannot change stats state for a transactional table " + fullTableName + " without " +
+ "providing the transactional write state for verification (new write ID " +
+ writeId + ", valid write IDs " + validWriteIds + "; current state " + oldVal + "; new" +
+ " state " + newVal;
}
@Override
@@ -4319,8 +4320,9 @@ public class ObjectStore implements RawStore, Configurable {
boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
if (isTxn && areTxnStatsSupported) {
// Transactional table is altered without a txn. Make sure there are no changes to the flag.
- String errorMsg = verifyStatsChangeCtx(oldp.getParameters(), newPart.getParameters(),
- newPart.getWriteId(), validWriteIds, false);
+ String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name),
+ oldp.getParameters(),
+ newPart.getParameters(), newPart.getWriteId(), validWriteIds, false);
if (errorMsg != null) {
throw new MetaException(errorMsg);
}
@@ -8525,7 +8527,7 @@ public class ObjectStore implements RawStore, Configurable {
if (!areTxnStatsSupported) {
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
} else {
- String errorMsg = verifyStatsChangeCtx(
+ String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name),
oldt.getParameters(), newParams, writeId, validWriteIds, true);
if (errorMsg != null) {
throw new MetaException(errorMsg);
@@ -8620,8 +8622,9 @@ public class ObjectStore implements RawStore, Configurable {
if (!areTxnStatsSupported) {
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
} else {
- String errorMsg = verifyStatsChangeCtx(
- mPartition.getParameters(), newParams, writeId, validWriteIds, true);
+ String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(statsDesc.getDbName(),
+ statsDesc.getTableName()),
+ mPartition.getParameters(), newParams, writeId, validWriteIds, true);
if (errorMsg != null) {
throw new MetaException(errorMsg);
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 3564efe..e366ebd 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -2067,7 +2067,7 @@ public class CachedStore implements RawStore, Configurable {
if (!areTxnStatsSupported) {
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
} else {
- String errorMsg = ObjectStore.verifyStatsChangeCtx(
+ String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName),
table.getParameters(), newParams, writeId, validWriteIds, true);
if (errorMsg != null) {
throw new MetaException(errorMsg);
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 9166f4a..66c23ab 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,15 @@ ut.service.batchSize=8
unitTests.module.itests.hive-unit=itests.hive-unit
ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries \
+ TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 \
+ TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables \
+ TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios \
+ TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat \
+ TestStatsReplicationScenariosACIDNoAutogather TestStatsReplicationScenariosMMNoAutogather \
+ TestStatsReplicationScenariosACID TestStatsReplicationScenariosMM \
+ TestStatsReplicationScenariosMigrationNoAutogather TestStatsReplicationScenariosMigration \
+ TestStatsReplicationScenariosNoAutogather TestStatsReplicationScenarios
unitTests.module.itests.qtest=itests.qtest
ut.itests.qtest.batchSize=9