You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/08/12 06:11:03 UTC
[hive] branch master updated: HIVE-23995:Don't set location for
managed tables in case of replication (Aasha Medhi,
reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a430ac3 HIVE-23995:Don't set location for managed tables in case of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
a430ac3 is described below
commit a430ac31441ad2d6a03dd24e3141f42c79e022f4
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Wed Aug 12 11:40:01 2020 +0530
HIVE-23995:Don't set location for managed tables in case of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
bin/ext/strictmanagedmigration.sh | 27 -
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hcatalog/api/repl/commands/TestCommands.java | 1 +
.../parse/BaseReplicationScenariosAcidTables.java | 1 +
.../hadoop/hive/ql/parse/TestExportImport.java | 1 +
.../TestReplTableMigrationWithJsonFormat.java | 35 -
.../ql/parse/TestReplWithJsonMessageFormat.java | 2 +-
.../hive/ql/parse/TestReplicationScenarios.java | 35 +-
.../parse/TestReplicationScenariosAcidTables.java | 1 +
.../TestReplicationScenariosExternalTables.java | 1 +
.../parse/TestReplicationWithTableMigration.java | 577 -------
.../parse/TestReplicationWithTableMigrationEx.java | 456 ------
.../ql/parse/TestStatsReplicationScenarios.java | 1 +
.../TestStatsReplicationScenariosMigration.java | 72 -
...sReplicationScenariosMigrationNoAutogather.java | 72 -
.../hadoop/hive/llap/AsyncResponseHandlerTest.java | 1 +
.../ql/ddl/table/AbstractAlterTableOperation.java | 7 -
.../ql/ddl/table/create/CreateTableOperation.java | 10 +-
.../rename/AlterTableRenamePartitionOperation.java | 7 -
.../hadoop/hive/ql/exec/ColumnStatsUpdateTask.java | 10 +-
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 11 -
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 117 +-
.../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 26 -
.../apache/hadoop/hive/ql/exec/repl/AckTask.java | 4 +
.../bootstrap/events/filesystem/FSTableEvent.java | 42 +-
.../ql/exec/repl/bootstrap/load/LoadDatabase.java | 3 +-
.../repl/bootstrap/load/table/LoadPartitions.java | 41 +-
.../exec/repl/bootstrap/load/table/LoadTable.java | 48 +-
.../incremental/IncrementalLoadTasksBuilder.java | 80 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 68 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 98 +-
.../hadoop/hive/ql/parse/ReplicationSpec.java | 16 -
.../repl/load/message/RenamePartitionHandler.java | 7 +-
.../repl/load/message/RenameTableHandler.java | 8 +-
.../load/message/TruncatePartitionHandler.java | 4 +-
.../repl/load/message/TruncateTableHandler.java | 4 +-
.../hadoop/hive/ql/plan/ColumnStatsUpdateWork.java | 8 +-
.../apache/hadoop/hive/ql/plan/ReplCopyWork.java | 10 -
.../apache/hadoop/hive/ql/plan/ReplTxnWork.java | 14 +-
.../hive/ql/util/HiveStrictManagedMigration.java | 1715 --------------------
.../hadoop/hive/ql/exec/util/TestRetryable.java | 2 +-
.../ql/util/TestHiveStrictManagedMigration.java | 309 ----
.../queries/clientpositive/repl_2_exim_basic.q | 1 +
.../queries/clientpositive/repl_3_exim_metadata.q | 1 +
.../queries/clientpositive/repl_4_exim_nocolstat.q | 1 +
.../ptest2/conf/deployed/master-mr2.properties | 5 +-
46 files changed, 91 insertions(+), 3871 deletions(-)
diff --git a/bin/ext/strictmanagedmigration.sh b/bin/ext/strictmanagedmigration.sh
deleted file mode 100644
index a24c321..0000000
--- a/bin/ext/strictmanagedmigration.sh
+++ /dev/null
@@ -1,27 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-THISSERVICE=strictmanagedmigration
-export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
-
-strictmanagedmigration () {
- CLASS=org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration
- HIVE_OPTS=''
- execHiveCmd $CLASS "$@"
-}
-
-strictmanagedmigration_help () {
- strictmanagedmigration "--help"
-}
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ab46865..d58326f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -531,7 +531,7 @@ public class HiveConf extends Configuration {
+ "task increment that would cross the specified limit."),
REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
"Number of threads that will be used to dump partition data information during repl dump."),
- REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false,
+ REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", true,
"Indicates whether replication should run data copy tasks during repl load operation."),
REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000,
"This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n"
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java
index ec19c5e..f803bc2 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java
@@ -77,6 +77,7 @@ public class TestCommands {
TestHCatClient.startMetaStoreServer();
hconf = TestHCatClient.getConf();
hconf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,"");
+ hconf.set(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
hconf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index 58b3ab4..a7cd3a6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -98,6 +98,7 @@ public class BaseReplicationScenariosAcidTables {
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
put("hive.in.repl.test", "true");
put("metastore.warehouse.tenant.colocation", "true");
+ put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
}};
acidEnableConf.putAll(overrides);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
index 3f4cacf..24104c7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
@@ -55,6 +55,7 @@ public class TestExportImport {
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
conf.set("hive.repl.include.external.tables", "false");
+ conf.set(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
deleted file mode 100644
index 5e7bf7e..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
-import org.junit.BeforeClass;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestReplTableMigrationWithJsonFormat extends TestReplicationWithTableMigration {
- @BeforeClass
- public static void classLevelSetup() throws Exception {
- Map<String, String> overrides = new HashMap<>();
- overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
- JSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrides);
- }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
index 6d9c31f..4d94975 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -40,7 +40,7 @@ public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
Map<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
JSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrides, false);
+ internalBeforeClassSetup(overrides);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b8e91dd..eb0776d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -130,7 +130,6 @@ public class TestReplicationScenarios {
private static HiveConf hconfMirror;
private static IDriver driverMirror;
private static HiveMetaStoreClient metaStoreClientMirror;
- private static boolean isMigrationTest;
// Make sure we skip backward-compat checking for those tests that don't generate events
@@ -149,10 +148,10 @@ public class TestReplicationScenarios {
HashMap<String, String> overrideProperties = new HashMap<>();
overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrideProperties, false);
+ internalBeforeClassSetup(overrideProperties);
}
- static void internalBeforeClassSetup(Map<String, String> additionalProperties, boolean forMigration)
+ static void internalBeforeClassSetup(Map<String, String> additionalProperties)
throws Exception {
hconf = new HiveConf(TestReplicationScenarios.class);
String metastoreUri = System.getProperty("test."+MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
@@ -160,7 +159,6 @@ public class TestReplicationScenarios {
hconf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
return;
}
- isMigrationTest = forMigration;
hconf.set(MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getHiveName(),
DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
@@ -183,6 +181,7 @@ public class TestReplicationScenarios {
hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
hconf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
hconf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE, true);
+ hconf.setBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY, false);
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
@@ -209,12 +208,6 @@ public class TestReplicationScenarios {
String thriftUri = MetastoreConf.getVar(hconfMirrorServer, MetastoreConf.ConfVars.THRIFT_URIS);
MetastoreConf.setVar(hconfMirror, MetastoreConf.ConfVars.THRIFT_URIS, thriftUri);
- if (forMigration) {
- hconfMirror.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES, true);
- hconfMirror.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
- hconfMirror.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
- "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- }
driverMirror = DriverFactory.newDriver(hconfMirror);
metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror);
@@ -1858,15 +1851,7 @@ public class TestReplicationScenarios {
InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour
}
- if (isMigrationTest) {
- // as the move is done using a different event, load will be done within a different transaction and thus
- // we will get two records.
- verifyRun("SELECT a from " + replDbName + ".unptned",
- new String[]{unptn_data[0], unptn_data[0]}, driverMirror);
-
- } else {
- verifyRun("SELECT a from " + replDbName + ".unptned", unptn_data[0], driverMirror);
- }
+ verifyRun("SELECT a from " + replDbName + ".unptned", unptn_data[0], driverMirror);
}
@Test
@@ -3046,10 +3031,7 @@ public class TestReplicationScenarios {
// Replicate all the events happened after bootstrap
incrementalLoadAndVerify(dbName, replDbName);
- // migration test is failing as CONCATENATE is not working. Its not creating the merged file.
- if (!isMigrationTest) {
- verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
- }
+ verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
}
@Test
@@ -3079,11 +3061,8 @@ public class TestReplicationScenarios {
// Replicate all the events happened so far
incrementalLoadAndVerify(dbName, replDbName);
- // migration test is failing as CONCATENATE is not working. Its not creating the merged file.
- if (!isMigrationTest) {
- verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
- verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
- }
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
}
@Test
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index b1c02e5..a7a5bdf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -103,6 +103,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
put("metastore.warehouse.tenant.colocation", "true");
put("hive.in.repl.test", "true");
+ put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
}};
acidEnableConf.putAll(overrides);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index cc1701f..a5678f2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -77,6 +77,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
+ overrides.put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
deleted file mode 100644
index 0645cef..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.junit.rules.TestName;
-import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
-import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables)
- */
-public class TestReplicationWithTableMigration {
- private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
- private static String fullyQualifiedReplicaExternalBase;
-
- @Rule
- public final TestName testName = new TestName();
-
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class);
- private static WarehouseInstance primary, replica;
- private String primaryDbName, replicatedDbName;
- private Path avroSchemaFile = null;
-
- @BeforeClass
- public static void classLevelSetup() throws Exception {
- HashMap<String, String> overrideProperties = new HashMap<>();
- overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
- GzipJSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrideProperties);
- }
-
- static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception {
- HiveConf conf = new HiveConf(TestReplicationWithTableMigration.class);
- conf.set("dfs.client.use.datanode.hostname", "true");
- conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
- MiniDFSCluster miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- final DistributedFileSystem fs = miniDFSCluster.getFileSystem();
- HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{
- put("fs.defaultFS", fs.getUri().toString());
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.strict.managed.tables", "true");
- }};
-
- HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
- put("fs.defaultFS", fs.getUri().toString());
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.support.concurrency", "false");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.strict.managed.tables", "false");
- put("hive.stats.autogather", "true");
- put("hive.stats.column.autogather", "true");
- }};
- configsForPrimary.putAll(overrideConfigs);
- primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
- hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
- replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
- fullyQualifiedReplicaExternalBase = miniDFSCluster.getFileSystem().getFileStatus(
- new Path("/")).getPath().toString();
- }
-
- private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
- Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
- String[] schemaVals = new String[] { "{",
- " \"type\" : \"record\",",
- " \"name\" : \"table1\",",
- " \"doc\" : \"Sqoop import of table1\",",
- " \"fields\" : [ {",
- " \"name\" : \"col1\",",
- " \"type\" : [ \"null\", \"string\" ],",
- " \"default\" : null,",
- " \"columnName\" : \"col1\",",
- " \"sqlType\" : \"12\"",
- " }, {",
- " \"name\" : \"col2\",",
- " \"type\" : [ \"null\", \"long\" ],",
- " \"default\" : null,",
- " \"columnName\" : \"col2\",",
- " \"sqlType\" : \"13\"",
- " } ],",
- " \"tableName\" : \"table1\"",
- "}"
- };
-
- try (FSDataOutputStream stream = fs.create(schemaFile)) {
- for (String line : schemaVals) {
- stream.write((line + "\n").getBytes());
- }
- }
- fs.deleteOnExit(schemaFile);
- return schemaFile;
- }
-
- @AfterClass
- public static void classLevelTearDown() throws IOException {
- primary.close();
- replica.close();
- }
-
- @Before
- public void setup() throws Throwable {
- primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
- replicatedDbName = "replicated_" + primaryDbName;
- primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
- SOURCE_OF_REPLICATION + "' = '1,2,3')");
- if (avroSchemaFile == null) {
- Path testPath = new Path("/tmp/avro_schema/definition/" + System.nanoTime());
- DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
- fs.mkdirs(testPath, new FsPermission("777"));
- avroSchemaFile = PathBuilder.fullyQualifiedHDFSUri(createAvroSchemaFile(fs, testPath), fs);
- }
- }
-
- @After
- public void tearDown() throws Throwable {
- primary.run("drop database if exists " + primaryDbName + " cascade");
- replica.run("drop database if exists " + replicatedDbName + " cascade");
- }
-
- private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable {
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into tacid values(1)")
- .run("insert into tacid values(2)")
- .run("insert into tacid values(3)")
- .run(
- "create table tacidpart (place string) partitioned by (country string) clustered by(place) "
- +
- "into 3 buckets stored as orc ")
- .run("alter table tacidpart add partition(country='france')")
- .run("insert into tacidpart partition(country='india') values('mumbai')")
- .run("insert into tacidpart partition(country='us') values('sf')")
- .run("insert into tacidpart partition(country='france') values('paris')")
- .run(
- "create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")")
- .run("insert into tflat values(11)")
- .run("insert into tflat values(22)")
- .run("create table tflattext (id int) ")
- .run("insert into tflattext values(111), (222)")
- .run("create table tflattextpart (id int) partitioned by (country string) ")
- .run("insert into tflattextpart partition(country='india') values(1111), (2222)")
- .run("insert into tflattextpart partition(country='us') values(3333)")
- .run(
- "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp/fol' ")
- .run("insert into tacidloc values(1)")
- .run("insert into tacidloc values(2)")
- .run("insert into tacidloc values(3)")
- .run(
- "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) "
- +
- "into 3 buckets stored as orc ")
- .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/fol/part'")
- .run("insert into tacidpartloc partition(country='india') values('mumbai')")
- .run("insert into tacidpartloc partition(country='us') values('sf')")
- .run("insert into tacidpartloc partition(country='france') values('paris')")
- .run(
- "create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
- + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
- .toString() + "')")
- .run("insert into avro_table values ('str1', 10)")
- .run(
- "create table avro_table_part partitioned by (country string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
- + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
- .toString() + "')")
- .run("insert into avro_table_part partition (country='india') values ('another', 13)")
- .dump(primaryDbName);
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattext")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc")));
- assertAvroTableState(primaryDbName, "avro_table", "avro_table_part");
- assertAvroTableState(primaryDbName, "avro_table_part");
- return tuple;
- }
-
- private void assertAvroTableState(String primaryDbName, String... tableNames) throws Exception {
- for (String tableName : tableNames) {
- Table avroTable = primary.getTable(primaryDbName, tableName);
- assertFalse(isTransactionalTable(avroTable));
- assertFalse(MetaStoreUtils.isExternalTable(avroTable));
- }
- }
-
- private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
- replica.run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart",
- "tacidloc", "tacidpartloc", "avro_table", "avro_table_part" })
- .run("repl status " + replicatedDbName)
- .verifyResult(lastReplId)
- .run("select id from tacid order by id")
- .verifyResults(new String[]{"1", "2", "3"})
- .run("select country from tacidpart order by country")
- .verifyResults(new String[] {"france", "india", "us"})
- .run("select rank from tflat order by rank")
- .verifyResults(new String[] {"11", "22"})
- .run("select id from tflattext order by id")
- .verifyResults(new String[] {"111", "222"})
- .run("select id from tflattextpart order by id")
- .verifyResults(new String[] {"1111", "2222", "3333"})
- .run("select id from tacidloc order by id")
- .verifyResults(new String[]{"1", "2", "3"})
- .run("select country from tacidpartloc order by country")
- .verifyResults(new String[] {"france", "india", "us"})
- .run("select col1 from avro_table")
- .verifyResults(new String[] { "str1" })
- .run("select col1 from avro_table_part")
- .verifyResults(new String[] { "another" });
-
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart")));
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tflat")));
- assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattext")));
- assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattextpart")));
- assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattext")));
- assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart")));
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc")));
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc")));
- assertTablePath(replicatedDbName, "avro_table");
- assertPartitionPath(replicatedDbName, "avro_table_part");
- }
-
- private void assertPartitionPath(String replicatedDbName, String tableName) throws Exception {
- Path tablePath = assertTablePath(replicatedDbName, tableName);
- List<Partition> partitions = replica.getAllPartitions(replicatedDbName, tableName);
- assertEquals(1, partitions.size());
- String actualPartitionPath = partitions.iterator().next().getSd().getLocation().toLowerCase();
- String expectedPartitionPath = new PathBuilder(tablePath.toString())
- .addDescendant("country=india").build().toUri().toString().toLowerCase();
- assertEquals(expectedPartitionPath, actualPartitionPath);
- }
-
- private Path assertTablePath(String replicatedDbName, String tableName) throws Exception {
- Table avroTable = replica.getTable(replicatedDbName, tableName);
- assertTrue(MetaStoreUtils.isExternalTable(avroTable));
- Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString())
- .addDescendant(replicatedDbName + ".db").addDescendant(tableName).build();
- String expectedTablePath = tablePath.toUri().toString().toLowerCase();
- String actualTablePath = avroTable.getSd().getLocation().toLowerCase();
- assertEquals(expectedTablePath, actualTablePath);
- return tablePath;
- }
-
- private void loadWithFailureInAddNotification(String tbl) throws Throwable {
- BehaviourInjection<CallerArguments, Boolean> callerVerifier
- = new BehaviourInjection<CallerArguments, Boolean>() {
- @Nullable
- @Override
- public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) {
- injectionPathCalled = true;
- if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
- LOG.warn("Verifier - DB: " + args.dbName
- + " Constraint Table: " + args.constraintTblName);
- return false;
- }
- if (args.tblName != null) {
- LOG.warn("Verifier - Table: " + args.tblName);
- return args.tblName.equalsIgnoreCase(tbl);
- }
- return true;
- }
- };
- InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
- try {
- replica.loadFailure(replicatedDbName, primaryDbName);
- } finally {
- InjectableBehaviourObjectStore.resetCallerVerifier();
- }
- callerVerifier.assertInjectionsPerformed(true, false);
- }
-
- @Test
- public void testBootstrapLoadMigrationManagedToAcid() throws Throwable {
- WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void testIncrementalLoadMigrationManagedToAcid() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
- tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- replica.load(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void testIncrementalLoadMigrationManagedToAcidFailure() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
- tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- loadWithFailureInAddNotification("tacid");
- replica.run("use " + replicatedDbName)
- .run("show tables like tacid")
- .verifyResult(null);
- replica.load(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void testIncrementalLoadMigrationManagedToAcidFailurePart() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
- tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- loadWithFailureInAddNotification("tacidpart");
- replica.run("use " + replicatedDbName)
- .run("show tables like tacidpart")
- .verifyResult(null);
- replica.load(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable {
- WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName)
- .run("REPL STATUS " + replicatedDbName)
- .verifyResult(bootStrapDump.lastReplicationId);
- List<String> selectStmtList = new ArrayList<>();
- List<String[]> expectedValues = new ArrayList<>();
- String tableName = testName.getMethodName() + "testInsert";
- String tableNameMM = tableName + "_MM";
-
- ReplicationTestUtils.appendInsert(primary, primaryDbName, null,
- tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendTruncate(primary, primaryDbName,
- null, selectStmtList, expectedValues);
- ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendImport(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendLoadLocal(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendInsertUnion(primary, primaryDbName,
- null, tableName, tableNameMM, selectStmtList, expectedValues);
- ReplicationTestUtils.appendAlterTable(primary, primaryDbName,
- null, selectStmtList, expectedValues);
-
- ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName,
- replicatedDbName, selectStmtList, expectedValues, bootStrapDump.lastReplicationId);
- }
-
- @Test
- public void testBootstrapConvertedExternalTableAutoPurgeDataOnDrop() throws Throwable {
- WarehouseInstance.Tuple bootstrap = primary.run("use " + primaryDbName)
- .run("create table avro_tbl partitioned by (country string) ROW FORMAT SERDE "
- + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro "
- + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')")
- .run("insert into avro_tbl partition (country='india') values ('another', 13)")
- .dump(primaryDbName);
-
- replica.load(replicatedDbName, primaryDbName);
- Path dataLocation = assertTablePath(replicatedDbName, "avro_tbl");
-
- WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
- .run("drop table avro_tbl")
- .dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
-
- // After drop, the external table data location should be auto deleted as it is converted one.
- assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
- }
-
- @Test
- public void testIncConvertedExternalTableAutoDeleteDataDirOnDrop() throws Throwable {
- WarehouseInstance.Tuple bootstrap = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
-
- primary.run("use " + primaryDbName)
- .run("create table avro_tbl ROW FORMAT SERDE "
- + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro "
- + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')")
- .run("insert into avro_tbl values ('str', 13)")
- .dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
-
- // Data location is valid and is under default external warehouse directory.
- Table avroTable = replica.getTable(replicatedDbName, "avro_tbl");
- assertTrue(MetaStoreUtils.isExternalTable(avroTable));
- Path dataLocation = new Path(avroTable.getSd().getLocation());
- assertTrue(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
-
- primary.run("use " + primaryDbName)
- .run("drop table avro_tbl")
- .dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
-
- // After drop, the external table data location should be auto deleted as it is converted one.
- assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
- }
-
- @Test
- public void testBootstrapLoadMigrationToAcidWithMoveOptimization() throws Throwable {
- List<String> withConfigs =
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
- WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, primaryDbName, withConfigs);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void testIncrementalLoadMigrationToAcidWithMoveOptimization() throws Throwable {
- List<String> withConfigs =
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName);
- tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- replica.load(replicatedDbName, primaryDbName, withConfigs);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- }
-
- @Test
- public void dynamicallyConvertManagedToExternalTable() throws Throwable {
- // With Strict managed disabled but Db enabled for replication, it is not possible to convert
- // external table to managed table.
- primary.run("use " + primaryDbName)
- .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into t1 values(1)")
- .run("create table t2 partitioned by (country string) ROW FORMAT SERDE "
- + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro "
- + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')")
- .run("insert into t2 partition (country='india') values ('another', 13)")
- .runFailure("alter table t1 set tblproperties('EXTERNAL'='true')")
- .runFailure("alter table t2 set tblproperties('EXTERNAL'='true')");
- }
-
- @Test
- public void dynamicallyConvertExternalToManagedTable() throws Throwable {
- // With Strict managed disabled but Db enabled for replication, it is not possible to convert
- // external table to managed table.
- primary.run("use " + primaryDbName)
- .run("create external table t1 (id int) stored as orc")
- .run("insert into table t1 values (1)")
- .run("create external table t2 (place string) partitioned by (country string)")
- .run("insert into table t2 partition(country='india') values ('bangalore')")
- .runFailure("alter table t1 set tblproperties('EXTERNAL'='false')")
- .runFailure("alter table t2 set tblproperties('EXTERNAL'='false')");
- }
-
- @Test
- public void testMigrationWithUpgrade() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into tacid values (3)")
- .run("create table texternal (id int) ")
- .run("insert into texternal values (1)")
- .dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName)
- .run("use " + replicatedDbName)
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("select id from tacid")
- .verifyResult("3")
- .run("select id from texternal")
- .verifyResult("1");
-
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
- assertFalse(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal")));
-
- // forcefully (setting db property) alter the table type. For acid table, set the bootstrap acid table to true. For
- // external table, the alter event should alter the table type at target cluster and then distcp should copy the
- // files. This is done to mock the upgrade done using HiveStrictManagedMigration.
- HiveConf hiveConf = primary.getConf();
-
- try {
- //Set the txn config required for this test. This will not enable the full acid functionality in the warehouse.
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
- hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-
- primary.run("use " + primaryDbName)
- .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '')")
- .run("insert into tacid values (1)")
- .run("insert into texternal values (2)")
- .run("alter table tacid set tblproperties ('transactional'='true')")
- .run("alter table texternal SET TBLPROPERTIES('EXTERNAL'='TRUE')")
- .run("insert into texternal values (3)")
- .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
- } finally {
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- }
-
- assertTrue(isFullAcidTable(primary.getTable(primaryDbName, "tacid")));
- assertTrue(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "texternal")));
-
- List<String> withConfigs = new ArrayList();
- withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'");
- withConfigs.add("'hive.repl.dump.include.acid.tables'='true'");
- withConfigs.add("'hive.repl.include.external.tables'='true'");
- withConfigs.add("'hive.repl.replica.external.table.base.dir' = '" + fullyQualifiedReplicaExternalBase + "'");
- withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'");
- tuple = primary.dump(primaryDbName, withConfigs);
- replica.load(replicatedDbName, primaryDbName, withConfigs);
- replica.run("use " + replicatedDbName)
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("select id from tacid")
- .verifyResults(new String[] { "1", "3" })
- .run("select id from texternal")
- .verifyResults(new String[] { "1", "2", "3" });
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
- assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal")));
- }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
deleted file mode 100644
index f98067c..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hive.hcatalog.listener.DbNotificationListener;
-import org.junit.*;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
-import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
-import static org.junit.Assert.*;
-
-/**
- * TestReplicationWithTableMigrationEx - test replication for Hive2 to Hive3 (Strict managed tables)
- */
-public class TestReplicationWithTableMigrationEx {
- @Rule
- public final TestName testName = new TestName();
-
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigrationEx.class);
- private static WarehouseInstance primary, replica;
- private String primaryDbName, replicatedDbName;
-
- @BeforeClass
- public static void classLevelSetup() throws Exception {
- HashMap<String, String> overrideProperties = new HashMap<>();
- internalBeforeClassSetup(overrideProperties);
- }
-
- static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception {
- HiveConf conf = new HiveConf(TestReplicationWithTableMigrationEx.class);
- conf.set("dfs.client.use.datanode.hostname", "true");
- conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
- MiniDFSCluster miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- final DistributedFileSystem fs = miniDFSCluster.getFileSystem();
- HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{
- put("fs.defaultFS", fs.getUri().toString());
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.strict.managed.tables", "true");
- put("hive.metastore.transactional.event.listeners", "");
- }};
-
- HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
- put("fs.defaultFS", fs.getUri().toString());
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.support.concurrency", "false");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.strict.managed.tables", "false");
- }};
- configsForPrimary.putAll(overrideConfigs);
- primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
- hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
- replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
- }
-
- @AfterClass
- public static void classLevelTearDown() throws IOException {
- primary.close();
- replica.close();
- }
-
- @Before
- public void setup() throws Throwable {
- primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
- replicatedDbName = "replicated_" + primaryDbName;
- primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
- SOURCE_OF_REPLICATION + "' = '1,2,3')");
- }
-
- @After
- public void tearDown() throws Throwable {
- primary.run("drop database if exists " + primaryDbName + " cascade");
- replica.run("drop database if exists " + replicatedDbName + " cascade");
- }
-
- private void prepareData(String primaryDbName) throws Throwable {
- primary.run("use " + primaryDbName)
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) "
- + "into 3 buckets stored as orc ")
- .run("insert into tacid values(1)")
- .run("insert into tacid values(2)")
- .run("insert into tacid values(3)")
- .run("alter table tacidpart add partition(country='france')")
- .run("insert into tacidpart partition(country='india') values('mumbai')")
- .run("insert into tacidpart partition(country='us') values('sf')")
- .run("insert into tacidpart partition(country='france') values('paris')");
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid")));
- assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart")));
- }
-
- private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
- replica.run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[] {"tacid", "tacidpart"})
- .run("repl status " + replicatedDbName)
- .verifyResult(lastReplId)
- .run("select count(*) from tacid")
- .verifyResult("3")
- .run("select id from tacid order by id")
- .verifyResults(new String[]{"1", "2", "3"})
- .run("select count(*) from tacidpart")
- .verifyResult("3")
- .run("select country from tacidpart order by country")
- .verifyResults(new String[] {"france", "india", "us"});
-
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
- assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart")));
- }
-
- private WarehouseInstance.Tuple dumpWithLastEventIdHacked(int eventId) throws Throwable {
- BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId> callerVerifier
- = new BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId>() {
- @Override
- public CurrentNotificationEventId apply(CurrentNotificationEventId id) {
- try {
- LOG.warn("GetCurrentNotificationEventIdBehaviour called");
- injectionPathCalled = true;
- // keep events to reply during incremental
- id.setEventId(eventId);
- return id;
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- return null;
- }
- }
- };
-
- InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerVerifier);
- try {
- return primary.dump(primaryDbName);
- } finally {
- InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour();
- callerVerifier.assertInjectionsPerformed(true, false);
- }
- }
-
- @Test
- public void testConcurrentOpDuringBootStrapDumpCreateTableReplay() throws Throwable {
- prepareData(primaryDbName);
-
- // dump with operation after last repl id is fetched.
- WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
-
- // next incremental dump
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- @Test
- public void testConcurrentOpDuringBootStrapDumpInsertReplay() throws Throwable {
- prepareData(primaryDbName);
-
- // dump with operation after last repl id is fetched.
- WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
-
- // next incremental dump
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- @Test
- public void testTableLevelDumpMigration() throws Throwable {
- WarehouseInstance.Tuple tuple = primary
- .run("use " + primaryDbName)
- .run("create table t1 (i int, j int)")
- .dump(primaryDbName+".'t1'");
- replica.run("create database " + replicatedDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
-
- tuple = primary.run("use " + primaryDbName)
- .run("insert into t1 values (1, 2)")
- .dump(primaryDbName+".'t1'");
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- @Test
- public void testConcurrentOpDuringBootStrapDumpInsertOverwrite() throws Throwable {
- primary.run("use " + primaryDbName)
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into tacid values(1)")
- .run("insert into tacid values(2)")
- .run("insert into tacid values(3)")
- .run("insert overwrite table tacid values(4)")
- .run("insert into tacid values(5)");
-
- // dump with operation after last repl id is fetched.
- WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- replica.run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[] {"tacid"})
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("select count(*) from tacid")
- .verifyResult("2")
- .run("select id from tacid order by id")
- .verifyResults(new String[]{"4", "5"});
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
-
- // next incremental dump
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- replica.run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[] {"tacid"})
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("select count(*) from tacid")
- .verifyResult("2")
- .run("select id from tacid order by id")
- .verifyResults(new String[]{"4", "5",});
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- private void loadWithFailureInAddNotification(String tbl) throws Throwable {
- BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean> callerVerifier
- = new BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean>() {
- @Nullable
- @Override
- public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) {
- injectionPathCalled = true;
- LOG.warn("InjectableBehaviourObjectStore called for Verifier - Table: " + args.tblName);
- if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
- LOG.warn("Verifier - DB: " + args.dbName
- + " Constraint Table: " + args.constraintTblName);
- return false;
- }
- if (args.tblName != null) {
- LOG.warn("Verifier - Table: " + args.tblName);
- return !args.tblName.equalsIgnoreCase(tbl);
- }
- return true;
- }
- };
- InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
- try {
- List<String> withClause = Collections.singletonList("'hive.metastore.transactional.event.listeners'='"
- + DbNotificationListener.class.getCanonicalName() + "'");
- replica.loadFailure(replicatedDbName, primaryDbName, withClause);
- } finally {
- InjectableBehaviourObjectStore.resetCallerVerifier();
- }
- callerVerifier.assertInjectionsPerformed(true, false);
- }
-
- @Test
- public void testIncLoadPenFlagPropAlterDB() throws Throwable {
- prepareData(primaryDbName);
-
- // dump with operation after last repl id is fetched.
- WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
-
- primary.run("use " + primaryDbName)
- .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')")
- .run("create table tbl_temp (fld int)")
- .dump(primaryDbName);
-
- loadWithFailureInAddNotification("tbl_temp");
- Database replDb = replica.getDatabase(replicatedDbName);
- assertTrue(ReplUtils.isFirstIncPending(replDb.getParameters()));
- assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
- assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val"));
-
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- @Test
- public void testIncLoadPenFlagWithMoveOptimization() throws Throwable {
- List<String> withClause = Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
-
- prepareData(primaryDbName);
-
- // dump with operation after last repl id is fetched.
- WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.load(replicatedDbName, primaryDbName, withClause);
- verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
-
- // next incremental dump
- tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName, withClause);
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
- }
-
- private void verifyUserName(String userName) throws Throwable {
- assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tbl_own").getOwner()));
- assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tbl_own").getOwner()));
- assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tacid").getOwner()));
- assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tacid").getOwner()));
- assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tacidpart").getOwner()));
- assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tacidpart").getOwner()));
- assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "tbl_part").getOwner()));
- assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "tbl_part").getOwner()));
- assertTrue(userName.equalsIgnoreCase(primary.getTable(primaryDbName, "view_own").getOwner()));
- assertTrue(userName.equalsIgnoreCase(replica.getTable(replicatedDbName, "view_own").getOwner()));
- }
-
- private void alterUserName(String userName) throws Throwable {
- primary.run("use " + primaryDbName)
- .run("alter table tbl_own set owner USER " + userName)
- .run("alter table tacid set owner USER " + userName)
- .run("alter table tacidpart set owner USER " + userName)
- .run("alter table tbl_part set owner USER " + userName)
- .run("alter table view_own set owner USER " + userName);
- }
-
- @Test
- public void testOnwerPropagation() throws Throwable {
- primary.run("use " + primaryDbName)
- .run("create table tbl_own (fld int)")
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) "
- + "into 3 buckets stored as orc ")
- .run("create table tbl_part (fld int) partitioned by (country string)")
- .run("insert into tbl_own values (1)")
- .run("create view view_own as select * from tbl_own");
-
- // test bootstrap
- alterUserName("hive");
- primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyUserName("hive");
-
- // test incremental
- alterUserName("hive1");
- primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyUserName("hive1");
- }
-
- @Test
- public void testOnwerPropagationInc() throws Throwable {
- primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
-
- primary.run("use " + primaryDbName)
- .run("create table tbl_own (fld int)")
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) "
- + "into 3 buckets stored as orc ")
- .run("create table tbl_part (fld int) partitioned by (country string)")
- .run("insert into tbl_own values (1)")
- .run("create view view_own as select * from tbl_own");
-
- // test incremental when table is getting created in the same load
- alterUserName("hive");
- primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, primaryDbName);
- verifyUserName("hive");
- }
-
- @Test
- public void dynamicallyConvertNonAcidToAcidTable() throws Throwable {
- // Non-acid table converted to an ACID table should be prohibited on source cluster with
- // strict managed false.
- primary.run("use " + primaryDbName)
- .run("create table t1 (id int) stored as orc")
- .run("insert into table t1 values (1)")
- .run("create table t2 (place string) partitioned by (country string) stored as orc")
- .run("insert into table t2 partition(country='india') values ('bangalore')")
- .runFailure("alter table t1 set tblproperties('transactional'='true')")
- .runFailure("alter table t2 set tblproperties('transactional'='true')")
- .runFailure("alter table t1 set tblproperties('transactional'='true', " +
- "'transactional_properties'='insert_only')")
- .runFailure("alter table t2 set tblproperties('transactional'='true', " +
- "'transactional_properties'='insert_only')");
-
- }
-
- @Test
- public void prohibitManagedTableLocationChangeOnReplSource() throws Throwable {
- String tmpLocation = "/tmp/" + System.nanoTime();
- primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
-
- // For managed tables at source, the table location shouldn't be changed for the given
- // non-partitioned table and partition location shouldn't be changed for partitioned table as
- // alter event doesn't capture the new files list. So, it may cause data inconsistsency. So,
- // if database is enabled for replication at source, then alter location on managed tables
- // should be blocked.
- primary.run("use " + primaryDbName)
- .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into t1 values(1)")
- .run("create table t2 (place string) partitioned by (country string) stored as orc")
- .run("insert into table t2 partition(country='india') values ('bangalore')")
- .runFailure("alter table t1 set location '" + tmpLocation + "'")
- .runFailure("alter table t2 partition(country='india') set location '" + tmpLocation + "'")
- .runFailure("alter table t2 set location '" + tmpLocation + "'");
-
- primary.miniDFSCluster.getFileSystem().delete(new Path(tmpLocation), true);
- }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
index db2d3a4..44eead0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -97,6 +97,7 @@ public class TestStatsReplicationScenarios {
Map<String, String> additionalOverrides = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ put(HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname, "false");
}};
Map<String, String> replicatedOverrides = new HashMap<>();
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
deleted file mode 100644
index 8544d32..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests statistics replication for ACID tables.
- */
-@org.junit.Ignore("HIVE-23986")
-public class TestStatsReplicationScenariosMigration extends TestStatsReplicationScenarios {
- @Rule
- public final TestName testName = new TestName();
-
- @BeforeClass
- public static void classLevelSetup() throws Exception {
- Map<String, String> overrides = new HashMap<>();
- overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
- GzipJSONMessageEncoder.class.getCanonicalName());
-
- Map<String, String> replicaConfigs = new HashMap<String, String>() {{
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.strict.managed.tables", "true");
- }};
- replicaConfigs.putAll(overrides);
-
- Map<String, String> primaryConfigs = new HashMap<String, String>() {{
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.support.concurrency", "false");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.strict.managed.tables", "false");
- }};
- primaryConfigs.putAll(overrides);
-
- internalBeforeClassSetup(primaryConfigs, replicaConfigs,
- TestStatsReplicationScenariosMigration.class, true, null);
- }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
deleted file mode 100644
index 21790c5..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests statistics replication for ACID tables.
- */
-@org.junit.Ignore("HIVE-23982")
-public class TestStatsReplicationScenariosMigrationNoAutogather extends TestStatsReplicationScenarios {
- @Rule
- public final TestName testName = new TestName();
-
- @BeforeClass
- public static void classLevelSetup() throws Exception {
- Map<String, String> overrides = new HashMap<>();
- overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
- GzipJSONMessageEncoder.class.getCanonicalName());
-
- Map<String, String> replicaConfigs = new HashMap<String, String>() {{
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.strict.managed.tables", "true");
- }};
- replicaConfigs.putAll(overrides);
-
- Map<String, String> primaryConfigs = new HashMap<String, String>() {{
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.support.concurrency", "false");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.strict.managed.tables", "false");
- }};
- primaryConfigs.putAll(overrides);
-
- internalBeforeClassSetup(primaryConfigs, replicaConfigs,
- TestStatsReplicationScenariosMigrationNoAutogather.class, false, null);
- }
-}
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
index d5d24cf..75c68d9 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
@@ -19,6 +19,7 @@ import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.Random;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
index 2ee66e5..0d2cee5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
@@ -139,13 +139,6 @@ public abstract class AbstractAlterTableOperation<T extends AbstractAlterTableDe
environmentContext.putToProperties(HiveMetaHook.ALTER_TABLE_OPERATION_TYPE, alterTable.getType().name());
if (partitions == null) {
long writeId = alterTable.getWriteId() != null ? alterTable.getWriteId() : 0;
- if (alterTable.getReplicationSpec() != null && alterTable.getReplicationSpec().isMigratingToTxnTable()) {
- Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(context.getConf());
- if (tmpWriteId == null) {
- throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
- }
- writeId = tmpWriteId;
- }
context.getDb().alterTable(alterTable.getDbTableName(), table, alterTable.isCascade(), environmentContext, true,
writeId);
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index 93c0209..d343467 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -106,15 +106,7 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
Long writeId = 0L;
EnvironmentContext environmentContext = null;
if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
- if (replicationSpec.isMigratingToTxnTable()) {
- // for migration we start the transaction and allocate write id in repl txn task for migration.
- writeId = ReplUtils.getMigrationCurrentTblWriteId(context.getConf());
- if (writeId == null) {
- throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
- }
- } else {
- writeId = desc.getReplWriteId();
- }
+ writeId = desc.getReplWriteId();
// In case of replication statistics is obtained from the source, so do not update those
// on replica.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/rename/AlterTableRenamePartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/rename/AlterTableRenamePartitionOperation.java
index 4eff7c1..59bd797 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/rename/AlterTableRenamePartitionOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/rename/AlterTableRenamePartitionOperation.java
@@ -75,13 +75,6 @@ public class AlterTableRenamePartitionOperation extends DDLOperation<AlterTableR
part.setValues(desc.getNewPartSpec());
long writeId = desc.getWriteId();
- if (replicationSpec != null && replicationSpec.isMigratingToTxnTable()) {
- Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(context.getConf());
- if (tmpWriteId == null) {
- throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
- }
- writeId = tmpWriteId;
- }
context.getDb().renamePartition(tbl, oldPartSpec, part, writeId);
Partition newPart = context.getDb().getPartition(tbl, desc.getNewPartSpec(), false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index dc6d31a..7844dd6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -327,17 +327,9 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
Table tbl = db.getTable(dbName, tblName);
long writeId = work.getWriteId();
// If it's a transactional table on source and target, we will get a valid writeId
- // associated with it. Otherwise it's a non-transactional table on source migrated to a
- // transactional table on target, we need to craft a valid writeId here.
+ // associated with it.
if (AcidUtils.isTransactionalTable(tbl)) {
ValidWriteIdList writeIds;
- if (work.getIsMigratingToTxn()) {
- Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
- if (tmpWriteId == null) {
- throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
- }
- writeId = tmpWriteId;
- }
// We need a valid writeId list to update column statistics for a transactional table. We
// do not have a valid writeId list which was used to update the column stats on the
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 51de87f..bf5a711 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -393,17 +393,6 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
checkFileFormats(db, tbd, table);
- // for transactional table if write id is not set during replication from a cluster with STRICT_MANAGED set
- // to false then set it now.
- if (tbd.getWriteId() <= 0 && AcidUtils.isTransactionalTable(table.getParameters())) {
- Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
- if (writeId == null) {
- throw new HiveException("MoveTask : Write id is not set in the config by open txn task for migration");
- }
- tbd.setWriteId(writeId);
- tbd.setStmtId(context.getHiveTxnManager().getStmtIdAndIncrement());
- }
-
boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
&& !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 66c3ced..58d8e8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -62,46 +62,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
super();
}
- // If file is already present in base directory, then remove it from the list.
- // Check HIVE-21197 for more detail
- private void updateSrcFileListForDupCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles,
- long writeId, int stmtId) throws IOException {
- FileStatus[] statuses;
- try {
- statuses = dstFs.listStatus(toPath, path -> {
- String fn = path.getName();
- try {
- return dstFs.getFileStatus(path).isDirectory() && fn.startsWith(AcidUtils.BASE_PREFIX);
- } catch (IOException e) {
- LOG.error("File listing failed for " + toPath, e);
- throw new RuntimeException(e.getMessage());
- }
- });
- } catch (FileNotFoundException e) {
- LOG.debug("Path {} does not exist, will be created before copy", toPath);
- return;
- }
-
- if (statuses.length > 1) {
- // if more than one base directory is present, then it means one or more replace operation is done. Any duplicate
- // check after that may cause data loss as the check will happen with the first base directory
- // which is no more valid.
- LOG.info("Number of base directory {} in path {} is more than one. Duplicate check should not be done.",
- statuses, toPath);
- return;
- }
-
- ListIterator<ReplChangeManager.FileInfo> iter = srcFiles.listIterator();
- Path basePath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(true, writeId, writeId, stmtId));
- while (iter.hasNext()) {
- Path filePath = new Path(basePath, iter.next().getSourcePath().getName());
- if (dstFs.exists(filePath)) {
- LOG.debug("File " + filePath + " is already present in base directory. So removing it from the list.");
- iter.remove();
- }
- }
- }
-
@Override
public int execute() {
LOG.debug("ReplCopyTask.execute()");
@@ -159,19 +119,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return 0;
}
}
-
- if (work.isCopyToMigratedTxnTable()) {
- if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
- return 0;
- }
-
- Path modifiedToPath = getModifiedToPath(toPath);
- if (modifiedToPath == null) {
- console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
- return 6;
- }
- toPath = modifiedToPath;
- }
} else {
// This flow is usually taken for IMPORT command
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
@@ -191,18 +138,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null));
}
- if (work.isCopyToMigratedTxnTable()) {
- if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
- return 0;
- }
-
- Path modifiedToPath = getModifiedToPath(toPath);
- if (modifiedToPath == null) {
- console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
- return 6;
- }
- toPath = modifiedToPath;
- }
}
LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size());
@@ -235,38 +170,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}
}
- private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles)
- throws IOException {
- if (work.isNeedCheckDuplicateCopy()) {
- updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
- ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
- if (srcFiles.isEmpty()) {
- LOG.info("All files are already present in the base directory. Skipping copy task.");
- return true;
- }
- }
- return false;
- }
-
- private Path getModifiedToPath(Path toPath) {
- // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
- // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
- // The toPath received in ReplCopyWork is pointing to table/partition base location.
- // So, just need to append the base or delta directory.
- // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
- // hence need to create base directory. If false, then it is repl load for regular insert into or
- // load flow and hence just create delta directory.
- Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
- if (writeId == null) {
- return null;
- }
- // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
- // duplicate copy from the source. Check HIVE-21197 for more detail.
- int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
- ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
- context.getHiveTxnManager().getStmtIdAndIncrement();
- return new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
- }
private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath)
throws IOException {
Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
@@ -319,35 +222,28 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return "REPL_COPY";
}
- public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
- HiveConf conf, boolean isAutoPurge, boolean needRecycle,
- boolean copyToMigratedTxnTable) {
- return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
- copyToMigratedTxnTable, true);
- }
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge, boolean needRecycle,
- boolean copyToMigratedTxnTable, boolean readSourceAsFileList) {
+ boolean readSourceAsFileList) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
- copyToMigratedTxnTable, readSourceAsFileList, false);
+ readSourceAsFileList, false);
}
private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge, boolean needRecycle,
- boolean copyToMigratedTxnTable, boolean readSourceAsFileList,
+ boolean readSourceAsFileList,
boolean overWrite) {
Task<?> copyTask = null;
LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite);
rcwork.setReadSrcAsFilesList(readSourceAsFileList);
- if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) {
+ if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
rcwork.setDeleteDestIfExist(true);
rcwork.setAutoPurge(isAutoPurge);
rcwork.setNeedRecycle(needRecycle);
}
- rcwork.setCopyToMigratedTxnTable(copyToMigratedTxnTable);
// For replace case, duplicate check should not be done. The new base directory will automatically make the older
// data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple
// replace events getting replayed in the first incremental load.
@@ -365,7 +261,8 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf) {
- return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false);
+ return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
+ true, false);
}
/*
@@ -375,6 +272,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean readSourceAsFileList, boolean overWrite) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
- false, readSourceAsFileList, overWrite);
+ readSourceAsFileList, overWrite);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 9131aee..48721d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -92,38 +92,12 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " +
work.getTxnIds().toString() + " and target txn id " + txnIds.toString());
return 0;
- case REPL_MIGRATION_OPEN_TXN:
- // if transaction is already opened (mostly by repl load command), then close it.
- if (txnManager.isTxnOpen()) {
- long txnId = txnManager.getCurrentTxnId();
- txnManager.commitTxn();
- LOG.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId);
- }
- Long txnIdMigration = txnManager.openTxn(context, user);
- long writeId = txnManager.getTableWriteId(work.getDbName(), work.getTableName());
- String validTxnList = txnManager.getValidTxns().toString();
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList);
- conf.set(ReplUtils.REPL_CURRENT_TBL_WRITE_ID, Long.toString(writeId));
- LOG.info("Started open txn for migration : " + txnIdMigration + " with valid txn list : " +
- validTxnList + " and write id " + writeId);
- return 0;
case REPL_ABORT_TXN:
for (long txnId : work.getTxnIds()) {
txnManager.replRollbackTxn(replPolicy, txnId);
LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId);
}
return 0;
- case REPL_MIGRATION_COMMIT_TXN:
- assert (work.getReplLastIdInfo() != null);
- long txnIdMigrationCommit = txnManager.getCurrentTxnId();
- CommitTxnRequest commitTxnRequestMigr = new CommitTxnRequest(txnIdMigrationCommit);
- commitTxnRequestMigr.setReplLastIdInfo(work.getReplLastIdInfo());
- txnManager.replCommitTxn(commitTxnRequestMigr);
- conf.unset(ValidTxnList.VALID_TXNS_KEY);
- conf.unset(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
- LOG.info("Committed Migration Txn with replLastIdInfo: " + work.getReplLastIdInfo() + " for txnId: " +
- txnIdMigrationCommit);
- return 0;
case REPL_COMMIT_TXN:
// Currently only one commit txn per event is supported.
assert (work.getTxnIds().size() == 1);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
index 03e8c4e..4dba12c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
@@ -35,12 +37,14 @@ import java.io.Serializable;
public class AckTask extends Task<AckWork> implements Serializable {
private static final long serialVersionUID = 1L;
+ private Logger LOG = LoggerFactory.getLogger(AckTask.class);
@Override
public int execute() {
try {
Path ackPath = work.getAckFilePath();
Utils.create(ackPath, conf);
+ LOG.info("Created ack file : {} ", ackPath);
} catch (SemanticException e) {
setException(e);
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index d8b10bd..a141aa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -30,9 +30,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -40,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
import com.google.common.collect.ImmutableList;
@@ -49,7 +45,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
public class FSTableEvent implements TableEvent {
private final Path fromPathMetadata;
@@ -106,38 +101,6 @@ public class FSTableEvent implements TableEvent {
public ImportTableDesc tableDesc(String dbName) throws SemanticException {
try {
Table table = new Table(metadata.getTable());
- boolean externalTableOnSource = TableType.EXTERNAL_TABLE.equals(table.getTableType());
- // The table can be non acid in case of replication from 2.6 cluster.
- if (!AcidUtils.isTransactionalTable(table)
- && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES)
- && (table.getTableType() == TableType.MANAGED_TABLE)) {
- Hive hiveDb = Hive.get(hiveConf);
- //TODO : dump metadata should be read to make sure that migration is required.
- HiveStrictManagedMigration.TableMigrationOption migrationOption =
- HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
- table.getTableType(), null, hiveConf,
- hiveDb.getMSC(), true);
- HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(),
- migrationOption, false,
- getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf);
- // If the conversion is from non transactional to transactional table
- if (AcidUtils.isTransactionalTable(table)) {
- replicationSpec().setMigratingToTxnTable();
- // For migrated tables associate bootstrap writeId when replicating stats.
- if (table.getTTable().isSetColStats()) {
- table.getTTable().setWriteId(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID);
- }
- }
- if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- // since we have converted to an external table now after applying the migration rules the
- // table location has to be set to null so that the location on the target is picked up
- // based on default configuration
- table.setDataLocation(null);
- if(!externalTableOnSource) {
- replicationSpec().setMigratingToExternalTable();
- }
- }
- }
ImportTableDesc tableDesc
= new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table);
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
@@ -190,7 +153,7 @@ public class FSTableEvent implements TableEvent {
StorageDescriptor sd = partition.getSd();
String location = sd.getLocation();
- if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) {
+ if (!tblDesc.isExternal()) {
/**
* this is required for file listing of all files in a partition for managed table as described in
* {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator}
@@ -207,8 +170,7 @@ public class FSTableEvent implements TableEvent {
colStatsDesc.setDbName(tblDesc.getDatabaseName());
columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj());
columnStatistics.setEngine(colStats.getEngine());
- writeId = replicationSpec().isMigratingToTxnTable() ?
- ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId();
+ writeId = partition.getWriteId();
}
AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 1444e15..41e09e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc;
import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc;
@@ -115,7 +116,7 @@ public class LoadDatabase {
return allTables.isEmpty() && allFunctions.isEmpty();
}
- private Task<?> createDbTask(Database dbObj) {
+ private Task<?> createDbTask(Database dbObj) throws MetaException {
// note that we do not set location - for repl load, we want that auto-created.
CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(dbObj.getName(), dbObj.getDescription(), null, null, false,
updateDbProps(dbObj, context.dumpDirectory));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index c4cfcf9..d751794 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -251,7 +251,6 @@ public class LoadPartitions {
boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly()
|| (TableType.EXTERNAL_TABLE.equals(table.getTableType())
- && !event.replicationSpec().isMigratingToExternalTable()
);
if (isOnlyDDLOperation) {
@@ -272,17 +271,8 @@ public class LoadPartitions {
if (event.replicationSpec().isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
- if (event.replicationSpec().isMigratingToTxnTable()) {
- // Migrating to transactional tables in bootstrap load phase.
- // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
- // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
- stagingDir = new Path(stagingDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
- }
} else {
- loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL :
- (event.replicationSpec().isMigratingToTxnTable()
- ? LoadFileType.KEEP_EXISTING
- : LoadFileType.OVERWRITE_EXISTING);
+ loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
}
boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
@@ -330,26 +320,11 @@ public class LoadPartitions {
LoadFileType loadFileType) {
MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
if (AcidUtils.isTransactionalTable(table)) {
- if (event.replicationSpec().isMigratingToTxnTable()) {
- // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir.
- // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata.
- LoadTableDesc loadTableWork = new LoadTableDesc(
- tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
- loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID
- );
- loadTableWork.setInheritTableSpecs(false);
- loadTableWork.setStmtId(0);
-
- // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0.
- loadTableWork.setInsertOverwrite(true);
- moveWork.setLoadTableWork(loadTableWork);
- } else {
- LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
- Collections.singletonList(tmpPath),
- Collections.singletonList(new Path(partSpec.getLocation())),
- true, null, null);
- moveWork.setMultiFilesDesc(loadFilesWork);
- }
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(tmpPath),
+ Collections.singletonList(new Path(partSpec.getLocation())),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
@@ -359,7 +334,6 @@ public class LoadPartitions {
moveWork.setLoadTableWork(loadTableWork);
}
moveWork.setIsInReplicationScope(event.replicationSpec().isInReplicationScope());
-
return TaskFactory.get(moveWork, context.hiveConf);
}
@@ -375,9 +349,6 @@ public class LoadPartitions {
throws MetaException, HiveException {
String child = Warehouse.makePartPath(partSpec.getPartSpec());
if (tableDesc.isExternal()) {
- if (event.replicationSpec().isMigratingToExternalTable()) {
- return new Path(tableDesc.getLocation(), child);
- }
String externalLocation =
ReplExternalTables.externalTableLocation(context.hiveConf, partSpec.getLocation());
return new Path(externalLocation);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 35ea777..8572f08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -186,6 +186,9 @@ public class LoadTable {
throws Exception {
Table table = tblDesc.toTable(context.hiveConf);
ReplicationSpec replicationSpec = event.replicationSpec();
+ if (!tblDesc.isExternal()) {
+ tblDesc.setLocation(null);
+ }
Task<?> createTableTask =
tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf);
if (tblRootTask == null) {
@@ -206,17 +209,6 @@ public class LoadTable {
Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
parentTask.addDependentTask(replTxnTask);
parentTask = replTxnTask;
- } else if (replicationSpec.isMigratingToTxnTable()) {
- // Non-transactional table is converted to transactional table.
- // The write-id 1 is used to copy data for the given table and also no writes are aborted.
- ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(
- AcidUtils.getFullTableName(tblDesc.getDatabaseName(), tblDesc.getTableName()),
- new long[0], new BitSet(), ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID);
- ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), null,
- validWriteIdList.writeToString(), ReplTxnWork.OperationType.REPL_WRITEID_STATE);
- Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
- parentTask.addDependentTask(replTxnTask);
- parentTask = replTxnTask;
}
boolean shouldCreateLoadTableTask = (
!isPartitioned(tblDesc)
@@ -224,7 +216,7 @@ public class LoadTable {
) || tuple.isConvertedFromManagedToExternal;
if (shouldCreateLoadTableTask) {
LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
- Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
+ Task<?> loadTableTask = loadTableTask(table, replicationSpec, table.getDataLocation(),
event.dataPath());
parentTask.addDependentTask(loadTableTask);
}
@@ -282,14 +274,8 @@ public class LoadTable {
if (replicationSpec.isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
- if (event.replicationSpec().isMigratingToTxnTable()) {
- // Migrating to transactional tables in bootstrap load phase.
- // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
- // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
- tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
- }
} else {
- loadFileType = (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable())
+ loadFileType = (replicationSpec.isReplace())
? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo);
}
@@ -304,25 +290,11 @@ public class LoadTable {
MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
if (AcidUtils.isTransactionalTable(table)) {
- if (replicationSpec.isMigratingToTxnTable()) {
- // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir.
- // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata.
- LoadTableDesc loadTableWork = new LoadTableDesc(
- tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
- loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID
- );
- loadTableWork.setStmtId(0);
-
- // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0.
- loadTableWork.setInsertOverwrite(true);
- moveWork.setLoadTableWork(loadTableWork);
- } else {
- LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
- Collections.singletonList(tmpPath),
- Collections.singletonList(tgtPath),
- true, null, null);
- moveWork.setMultiFilesDesc(loadFilesWork);
- }
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(tmpPath),
+ Collections.singletonList(tgtPath),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 52b6547..b00341a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -22,10 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc;
@@ -52,7 +49,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -224,34 +220,6 @@ public class IncrementalLoadTasksBuilder {
return addUpdateReplStateTasks(messageHandler.getUpdatedMetadata(), tasks);
}
- private Task<?> getMigrationCommitTxnTask(String dbName, String tableName,
- List<Map <String, String>> partSpec, String replState,
- Task<?> preCursor) throws SemanticException {
- ReplLastIdInfo replLastIdInfo = new ReplLastIdInfo(dbName, Long.parseLong(replState));
- replLastIdInfo.setTable(tableName);
- if (partSpec != null && !partSpec.isEmpty()) {
- List<String> partitionList = new ArrayList<>();
- for (Map <String, String> part : partSpec) {
- try {
- partitionList.add(Warehouse.makePartName(part, false));
- } catch (MetaException e) {
- throw new SemanticException(e.getMessage());
- }
- }
- replLastIdInfo.setPartitionList(partitionList);
- }
-
- Task<?> updateReplIdTxnTask = TaskFactory.get(new ReplTxnWork(replLastIdInfo, ReplTxnWork
- .OperationType.REPL_MIGRATION_COMMIT_TXN), conf);
-
- if (preCursor != null) {
- preCursor.addDependentTask(updateReplIdTxnTask);
- log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(),
- updateReplIdTxnTask.getClass(), updateReplIdTxnTask.getId());
- }
- return updateReplIdTxnTask;
- }
-
private Task<?> tableUpdateReplStateTask(String dbName, String tableName,
Map<String, String> partSpec, String replState,
Task<?> preCursor) throws SemanticException {
@@ -300,14 +268,6 @@ public class IncrementalLoadTasksBuilder {
return importTasks;
}
- boolean needCommitTx = updatedMetaDataTracker.isNeedCommitTxn();
- // In migration flow, we should have only one table update per event.
- if (needCommitTx) {
- // currently, only commit txn event can have updates in multiple table. Commit txn does not starts
- // a txn and thus needCommitTx must have set to false.
- assert updatedMetaDataTracker.getUpdateMetaDataList().size() <= 1;
- }
-
// Create a barrier task for dependency collection of import tasks
Task<?> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
@@ -320,44 +280,24 @@ public class IncrementalLoadTasksBuilder {
String tableName = updateMetaData.getTableName();
// If any partition is updated, then update repl state in partition object
- if (needCommitTx) {
- if (updateMetaData.getPartitionsList().size() > 0) {
- updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName,
- updateMetaData.getPartitionsList(), replState, barrierTask);
- tasks.add(updateReplIdTask);
- // commit txn task will update repl id for table and database also.
- break;
- }
- } else {
- for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) {
- updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
- tasks.add(updateReplIdTask);
- }
+
+ for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) {
+ updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
+ tasks.add(updateReplIdTask);
}
+
// If any table/partition is updated, then update repl state in table object
if (tableName != null) {
- if (needCommitTx) {
- updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, null,
- replState, barrierTask);
- tasks.add(updateReplIdTask);
- // commit txn task will update repl id for database also.
- break;
- }
updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
tasks.add(updateReplIdTask);
}
- // If any table/partition is updated, then update repl state in db object
- if (needCommitTx) {
- updateReplIdTask = getMigrationCommitTxnTask(dbName, null, null,
- replState, barrierTask);
- tasks.add(updateReplIdTask);
- } else {
- // For table level load, need not update replication state for the database
- updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
- tasks.add(updateReplIdTask);
- }
+
+ // For table level load, need not update replication state for the database
+ updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+
}
if (tasks.isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 137cc29..985bc39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -36,19 +36,16 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
-import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
@@ -61,7 +58,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMigrationOption.MANAGED;
public class ReplUtils {
@@ -102,14 +98,6 @@ public class ReplUtils {
// One file per database, named after the db name. The directory is not created for db level replication.
public static final String REPL_TABLE_LIST_DIR_NAME = "_tables";
- // Migrating to transactional tables in bootstrap load phase.
- // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
- public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L;
-
- // we keep the statement id as 0 so that the base directory is created with 0 and is easy to find out during
- // duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained.
- public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0;
-
// Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
// seen in production or in case of tests other than the ones where it's required.
public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
@@ -239,45 +227,10 @@ public class ReplUtils {
return val;
}
- public static boolean isTableMigratingToTransactional(HiveConf conf,
- org.apache.hadoop.hive.metastore.api.Table tableObj)
- throws TException, IOException {
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) &&
- !AcidUtils.isTransactionalTable(tableObj) &&
- TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
- //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata.
- HiveStrictManagedMigration.TableMigrationOption migrationOption =
- HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE,
- null, conf, null, true);
- return migrationOption == MANAGED;
- }
- return false;
- }
-
- private static void addOpenTxnTaskForMigration(String actualDbName, String actualTblName,
- HiveConf conf,
- UpdatedMetaDataTracker updatedMetaDataTracker,
- List<Task<?>> taskList,
- Task<?> childTask) {
- Task<?> replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName,
- ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf);
- replTxnTask.addDependentTask(childTask);
- updatedMetaDataTracker.setNeedCommitTxn(true);
- taskList.add(replTxnTask);
- }
- public static List<Task<?>> addOpenTxnTaskForMigration(String actualDbName,
- String actualTblName, HiveConf conf,
- UpdatedMetaDataTracker updatedMetaDataTracker,
- Task<?> childTask,
- org.apache.hadoop.hive.metastore.api.Table tableObj)
- throws IOException, TException {
+ public static List<Task<?>> addChildTask(Task<?> childTask) {
List<Task<?>> taskList = new ArrayList<>();
taskList.add(childTask);
- if (isTableMigratingToTransactional(conf, tableObj) && updatedMetaDataTracker != null) {
- addOpenTxnTaskForMigration(actualDbName, actualTblName, conf, updatedMetaDataTracker,
- taskList, childTask);
- }
return taskList;
}
@@ -288,19 +241,10 @@ public class ReplUtils {
long writeId)
throws IOException, TException {
List<Task<?>> taskList = new ArrayList<>();
- boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj);
- ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats, isMigratingToTxn);
+ ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats);
work.setWriteId(writeId);
Task<?> task = TaskFactory.get(work, conf);
taskList.add(task);
- // If the table is going to be migrated to a transactional table we will need to open
- // and commit a transaction to associate a valid writeId with the statistics.
- if (isMigratingToTxn) {
- ReplUtils.addOpenTxnTaskForMigration(colStats.getStatsDesc().getDbName(),
- colStats.getStatsDesc().getTableName(), conf, updatedMetadata, taskList,
- task);
- }
-
return taskList;
}
@@ -346,14 +290,6 @@ public class ReplUtils {
return envContext;
}
- public static Long getMigrationCurrentTblWriteId(HiveConf conf) {
- String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
- if (writeIdString == null) {
- return null;
- }
- return Long.parseLong(writeIdString);
- }
-
// Only for testing, we do not include ACID tables in the dump (and replicate) if config says so.
public static boolean includeAcidTableInDump(HiveConf conf) {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index dea2ca2..79ccbc5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -65,12 +65,9 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +89,6 @@ import java.util.Map;
import java.util.TreeMap;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
-import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
/**
* ImportSemanticAnalyzer.
@@ -207,22 +203,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table tableObj, MetaData rv,
- EximUtil.SemanticAnalyzerWrapperContext x)
- throws IOException, TException, HiveException {
- x.getLOG().debug("Converting table " + tableObj.getTableName() + " of type " + tableObj.getTableType() +
- " with para " + tableObj.getParameters());
- //TODO : isPathOwnedByHive is hard coded to true, need to get it from repl dump metadata.
- TableType tableType = TableType.valueOf(tableObj.getTableType());
- HiveStrictManagedMigration.TableMigrationOption migrationOption =
- HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, tableType,
- null, x.getConf(), x.getHive().getMSC(), true);
- HiveStrictManagedMigration.migrateTable(tableObj, tableType, migrationOption, false,
- getHiveUpdater(x.getConf()), x.getHive().getMSC(), x.getConf());
- x.getLOG().debug("Converted table " + tableObj.getTableName() + " of type " + tableObj.getTableType() +
- " with para " + tableObj.getParameters());
- }
-
/**
* The same code is used from both the "repl load" as well as "import".
* Given that "repl load" now supports two modes "repl load dbName [location]" and
@@ -278,26 +258,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ImportTableDesc tblDesc;
org.apache.hadoop.hive.metastore.api.Table tblObj = rv.getTable();
try {
- // The table can be non acid in case of replication from a cluster with STRICT_MANAGED set to false.
- if (!TxnUtils.isTransactionalTable(tblObj) && replicationSpec.isInReplicationScope() &&
- x.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) &&
- (TableType.valueOf(tblObj.getTableType()) == TableType.MANAGED_TABLE)) {
- //TODO : dump metadata should be read to make sure that migration is required.
- upgradeTableDesc(tblObj, rv, x);
- //if the conversion is from non transactional to transactional table
- if (TxnUtils.isTransactionalTable(tblObj)) {
- replicationSpec.setMigratingToTxnTable();
- }
- tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
- if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
- replicationSpec.setMigratingToExternalTable();
- tblDesc.setExternal(true);
- // we should set this to null so default location for external tables is chosen on target
- tblDesc.setLocation(null);
- }
- } else {
- tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
- }
+ tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
} catch (Exception e) {
throw new HiveException(e);
}
@@ -415,8 +376,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
StorageDescriptor sd = partition.getSd();
String location = null;
- if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() &&
- !replicationSpec.isMigratingToExternalTable()) {
+ if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()) {
location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation());
LOG.debug("partition {} has data location: {}", partition, location);
} else {
@@ -452,10 +412,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadFileType lft;
boolean isSkipTrash = false;
boolean needRecycle = false;
- boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
- if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
- x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+ if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
+ REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
lft = LoadFileType.IGNORE;
destPath = loadPath = tgtPath;
isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
@@ -484,7 +443,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
lft = replace ? LoadFileType.REPLACE_ALL :
- replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
+ LoadFileType.OVERWRITE_EXISTING;
}
}
@@ -503,7 +462,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.isInReplicationScope()) {
boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
- isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad);
+ isSkipTrash, needRecycle, copyAtLoad);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
@@ -521,9 +480,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
- if (replicationSpec.isMigratingToTxnTable()) {
- loadTableWork.setInsertOverwrite(replace);
- }
loadTableWork.setStmtId(stmtId);
moveWork.setLoadTableWork(loadTableWork);
}
@@ -580,7 +536,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
boolean isSkipTrash = false;
boolean needRecycle = false;
- boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)
|| (tblDesc.isExternal() && tblDesc.getLocation() == null)) {
@@ -613,8 +568,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadFileType loadFileType;
Path destPath;
- if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
- x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+ if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
+ REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
loadFileType = LoadFileType.IGNORE;
destPath = tgtLocation;
isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
@@ -626,8 +581,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
} else {
loadFileType = replicationSpec.isReplace() ?
- LoadFileType.REPLACE_ALL :
- replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
+ LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
//Replication scope the write id will be invalid
boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
replicationSpec.isInReplicationScope();
@@ -651,7 +605,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.isInReplicationScope()) {
boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
- x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad);
+ x.getConf(), isSkipTrash, needRecycle, copyAtLoad);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}
@@ -682,9 +636,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partSpec.getPartSpec(),
loadFileType,
writeId);
- if (replicationSpec.isMigratingToTxnTable()) {
- loadTableWork.setInsertOverwrite(replicationSpec.isReplace());
- }
loadTableWork.setStmtId(stmtId);
loadTableWork.setInheritTableSpecs(false);
moveWork.setLoadTableWork(loadTableWork);
@@ -725,8 +676,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static boolean shouldSkipDataCopyInReplScope(ImportTableDesc tblDesc, ReplicationSpec replicationSpec) {
return ((replicationSpec != null)
&& replicationSpec.isInReplicationScope()
- && tblDesc.isExternal()
- && !replicationSpec.isMigratingToExternalTable());
+ && tblDesc.isExternal());
}
/**
@@ -741,17 +691,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
we don't do anything since for external table partitions the path is already set correctly
in {@link org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler}
*/
- if (replicationSpec.isMigratingToExternalTable()) {
- // at this point the table.getDataLocation() should be set already for external tables
- // using the correct values of default warehouse external table location on target.
- partSpec.setLocation(new Path(tblDesc.getLocation(),
- Warehouse.makePartPath(partSpec.getPartSpec())).toString());
- LOG.debug("partition spec {} has location set to {} for a table migrating to external table"
- + " from managed table",
- StringUtils.join(partSpec.getPartSpec().entrySet(), ","),
- partSpec.getLocation()
- );
- }
return;
}
Path tgtPath;
@@ -1197,20 +1136,15 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
tblDesc.getDatabaseName(),
tblDesc.getTableName(),
null);
- if (replicationSpec.isMigratingToTxnTable()) {
- x.setOpenTxnTask(TaskFactory.get(new ReplTxnWork(tblDesc.getDatabaseName(),
- tblDesc.getTableName(), ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), x.getConf()));
- updatedMetadata.setNeedCommitTxn(true);
- }
}
if (tblDesc.getLocation() == null) {
- if (!waitOnPrecursor){
+ if (!waitOnPrecursor) {
tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName(), tblDesc.isExternal()).toString());
} else {
tblDesc.setLocation(
- wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal())
- ).toString());
+ wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal())
+ ).toString());
}
}
@@ -1290,7 +1224,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
table = createNewTableMetadataObject(tblDesc, true);
isOldTableValid = false;
}
-
// Table existed, and is okay to replicate into, not dropping and re-creating.
if (isPartitioned(tblDesc)) {
x.getLOG().debug("table partitioned");
@@ -1352,6 +1285,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
} else {
+ if (table != null && table.getTableType() != TableType.EXTERNAL_TABLE && table.getSd().getLocation() != null) {
+ tblDesc.setLocation(table.getSd().getLocation());
+ }
x.getLOG().debug("table non-partitioned");
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 8675ace..cb0f4b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -46,8 +46,6 @@ public class ReplicationSpec {
//TxnIds snapshot
private String validTxnList = null;
private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
- private boolean isMigratingToTxnTable = false;
- private boolean isMigratingToExternalTable = false;
private boolean needDupCopyCheck = false;
//Determine if replication is done using repl or export-import
private boolean isRepl = false;
@@ -404,20 +402,6 @@ public class ReplicationSpec {
}
}
- public boolean isMigratingToTxnTable() {
- return isMigratingToTxnTable;
- }
- public void setMigratingToTxnTable() {
- isMigratingToTxnTable = true;
- }
-
- public boolean isMigratingToExternalTable() {
- return isMigratingToExternalTable;
- }
-
- public void setMigratingToExternalTable() {
- isMigratingToExternalTable = true;
- }
public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) {
String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
index 57f3043..ed7aa8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -57,9 +56,6 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
oldPartSpec.put(fs.getName(), beforeIterator.next());
newPartSpec.put(fs.getName(), afterIterator.next());
}
- if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj)) {
- replicationSpec.setMigratingToTxnTable();
- }
AlterTableRenamePartitionDesc renamePtnDesc = new AlterTableRenamePartitionDesc(
tableName, oldPartSpec, newPartSpec, replicationSpec, null);
@@ -69,8 +65,7 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
context.log.debug("Added rename ptn task : {}:{}->{}",
renamePtnTask.getId(), oldPartSpec, newPartSpec);
updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
- return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName,
- context.hiveConf, updatedMetadata, renamePtnTask, tableObj);
+ return ReplUtils.addChildTask(renamePtnTask);
} catch (Exception e) {
throw (e instanceof SemanticException)
? (SemanticException) e
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
index 7a4cb93..05e094b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import java.io.Serializable;
import java.util.List;
public class RenameTableHandler extends AbstractMessageHandler {
@@ -58,9 +57,7 @@ public class RenameTableHandler extends AbstractMessageHandler {
TableName oldName = TableName.fromString(tableObjBefore.getTableName(), null, oldDbName);
TableName newName = TableName.fromString(tableObjAfter.getTableName(), null, newDbName);
ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec();
- if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter)) {
- replicationSpec.setMigratingToTxnTable();
- }
+
AlterTableRenameDesc renameTableDesc =
new AlterTableRenameDesc(oldName, replicationSpec, false, newName.getNotEmptyDbTable());
renameTableDesc.setWriteId(msg.getWriteId());
@@ -76,8 +73,7 @@ public class RenameTableHandler extends AbstractMessageHandler {
// Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out
// tablesUpdated. However, we explicitly don't support repl of that sort, and error out above
// if so. If that should ever change, this will need reworking.
- return ReplUtils.addOpenTxnTaskForMigration(oldDbName, tableObjBefore.getTableName(),
- context.hiveConf, updatedMetadata, renameTableTask, tableObjAfter);
+ return ReplUtils.addChildTask(renameTableTask);
} catch (Exception e) {
throw (e instanceof SemanticException)
? (SemanticException) e
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
index 6c3a7eb..85e9f92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -67,8 +66,7 @@ public class TruncatePartitionHandler extends AbstractMessageHandler {
updatedMetadata.set(context.dmd.getEventTo().toString(), tName.getDb(), tName.getTable(), partSpec);
try {
- return ReplUtils.addOpenTxnTaskForMigration(tName.getDb(), tName.getTable(),
- context.hiveConf, updatedMetadata, truncatePtnTask, tblObj);
+ return ReplUtils.addChildTask(truncatePtnTask);
} catch (Exception e) {
throw new SemanticException(e.getMessage());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
index 2b12be4..6a50c8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import java.io.Serializable;
import java.util.List;
public class TruncateTableHandler extends AbstractMessageHandler {
@@ -46,8 +45,7 @@ public class TruncateTableHandler extends AbstractMessageHandler {
updatedMetadata.set(context.dmd.getEventTo().toString(), tName.getDb(), tName.getTable(), null);
try {
- return ReplUtils.addOpenTxnTaskForMigration(tName.getDb(), tName.getTable(),
- context.hiveConf, updatedMetadata, truncateTableTask, msg.getTableObjBefore());
+ return ReplUtils.addChildTask(truncateTableTask);
} catch (Exception e) {
throw new SemanticException(e.getMessage());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index c90ea43..1490025 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -44,8 +44,6 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
private final String colName;
private final String colType;
private final ColumnStatistics colStats;
- private final boolean isMigratingToTxn; // Is the table for which we are updating stats going
- // to be migrated during replication.
private long writeId;
public ColumnStatsUpdateWork(String partName,
@@ -61,12 +59,10 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
this.colName = colName;
this.colType = colType;
this.colStats = null;
- this.isMigratingToTxn = false;
}
- public ColumnStatsUpdateWork(ColumnStatistics colStats, boolean isMigratingToTxn) {
+ public ColumnStatsUpdateWork(ColumnStatistics colStats) {
this.colStats = colStats;
- this.isMigratingToTxn = isMigratingToTxn;
this.partName = null;
this.mapProp = null;
this.dbName = null;
@@ -106,8 +102,6 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
public ColumnStatistics getColStats() { return colStats; }
- public boolean getIsMigratingToTxn() { return isMigratingToTxn; }
-
@Override
public void setWriteId(long writeId) {
this.writeId = writeId;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
index dd01b21..21da20f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -57,8 +57,6 @@ public class ReplCopyWork extends CopyWork {
private String distCpDoAsUser = null;
- private boolean copyToMigratedTxnTable;
-
private boolean checkDuplicateCopy = false;
private boolean overWrite = false;
@@ -112,14 +110,6 @@ public class ReplCopyWork extends CopyWork {
this.isAutoPurge = isAutoPurge;
}
- public boolean isCopyToMigratedTxnTable() {
- return copyToMigratedTxnTable;
- }
-
- public void setCopyToMigratedTxnTable(boolean copyToMigratedTxnTable) {
- this.copyToMigratedTxnTable = copyToMigratedTxnTable;
- }
-
public boolean isNeedCheckDuplicateCopy() {
return checkDuplicateCopy;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
index a9f98cc..7e16a7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
@@ -50,8 +50,7 @@ public class ReplTxnWork implements Serializable {
* Different kind of events supported for replaying.
*/
public enum OperationType {
- REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID, REPL_WRITEID_STATE,
- REPL_MIGRATION_OPEN_TXN, REPL_MIGRATION_COMMIT_TXN
+ REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID, REPL_WRITEID_STATE
}
OperationType operation;
@@ -93,17 +92,6 @@ public class ReplTxnWork implements Serializable {
this.operation = type;
}
- public ReplTxnWork(String dbName, String tableName, OperationType type) {
- this(null, dbName, tableName, null, type, null, null);
- assert type == OperationType.REPL_MIGRATION_OPEN_TXN;
- }
-
- public ReplTxnWork(ReplLastIdInfo replLastIdInfo, OperationType type) {
- this(null, null, null, null, type, null, null);
- assert type == OperationType.REPL_MIGRATION_COMMIT_TXN;
- this.replLastIdInfo = replLastIdInfo;
- }
-
public void addWriteEventInfo(WriteEventInfo writeEventInfo) {
if (this.writeEventInfos == null) {
this.writeEventInfos = new ArrayList<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
deleted file mode 100644
index bc94702..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java
+++ /dev/null
@@ -1,1715 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.util;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.cli.CommonCliOptions;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
-import static org.apache.hadoop.hive.metastore.TableType.MANAGED_TABLE;
-import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
-
-public class HiveStrictManagedMigration {
-
- private static final Logger LOG = LoggerFactory.getLogger(HiveStrictManagedMigration.class);
- @VisibleForTesting
- static int RC = 0;
-
- public enum TableMigrationOption {
- NONE, // Do nothing
- VALIDATE, // No migration, just validate that the tables
- AUTOMATIC, // Automatically determine if the table should be managed or external
- EXTERNAL, // Migrate tables to external tables
- MANAGED // Migrate tables as managed transactional tables
- }
-
- private static class RunOptions {
- final String dbRegex;
- final String tableRegex;
- final String oldWarehouseRoot;
- final TableMigrationOption migrationOption;
- final Properties confProps;
- boolean shouldModifyManagedTableLocation;
- final boolean shouldModifyManagedTableOwner;
- final boolean shouldModifyManagedTablePermissions;
- boolean shouldMoveExternal;
- final boolean dryRun;
- final TableType tableType;
- final int tablePoolSize;
- final String fsOperationUser;
-
- RunOptions(String dbRegex,
- String tableRegex,
- String oldWarehouseRoot,
- TableMigrationOption migrationOption,
- Properties confProps,
- boolean shouldModifyManagedTableLocation,
- boolean shouldModifyManagedTableOwner,
- boolean shouldModifyManagedTablePermissions,
- boolean shouldMoveExternal,
- boolean dryRun,
- TableType tableType,
- int tablePoolSize,
- String fsOperationUser) {
- super();
- this.dbRegex = dbRegex;
- this.tableRegex = tableRegex;
- this.oldWarehouseRoot = oldWarehouseRoot;
- this.migrationOption = migrationOption;
- this.confProps = confProps;
- this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation;
- this.shouldModifyManagedTableOwner = shouldModifyManagedTableOwner;
- this.shouldModifyManagedTablePermissions = shouldModifyManagedTablePermissions;
- this.shouldMoveExternal = shouldMoveExternal;
- this.dryRun = dryRun;
- this.tableType = tableType;
- this.tablePoolSize = tablePoolSize;
- this.fsOperationUser = fsOperationUser;
- }
-
- public void setShouldModifyManagedTableLocation(boolean shouldModifyManagedTableLocation) {
- this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation;
- }
-
- public void setShouldMoveExternal(boolean shouldMoveExternal) {
- this.shouldMoveExternal = shouldMoveExternal;
- }
-
- @Override
- public String toString() {
- return "RunOptions{" +
- "dbRegex='" + dbRegex + '\'' +
- ", tableRegex='" + tableRegex + '\'' +
- ", oldWarehouseRoot='" + oldWarehouseRoot + '\'' +
- ", migrationOption=" + migrationOption +
- ", confProps=" + confProps +
- ", shouldModifyManagedTableLocation=" + shouldModifyManagedTableLocation +
- ", shouldModifyManagedTableOwner=" + shouldModifyManagedTableOwner +
- ", shouldModifyManagedTablePermissions=" + shouldModifyManagedTablePermissions +
- ", shouldMoveExternal=" + shouldMoveExternal +
- ", dryRun=" + dryRun +
- ", tableType=" + tableType +
- ", tablePoolSize=" + tablePoolSize +
- ", fsOperationUser=" + fsOperationUser +
- '}';
- }
- }
-
- private static class OwnerPermsOptions {
- final String ownerName;
- final String groupName;
- final FsPermission dirPerms;
- final FsPermission filePerms;
-
- OwnerPermsOptions(String ownerName, String groupName, FsPermission dirPerms, FsPermission filePerms) {
- this.ownerName = ownerName;
- this.groupName = groupName;
- this.dirPerms = dirPerms;
- this.filePerms = filePerms;
- }
- }
-
- private static class WarehouseRootCheckResult {
- final boolean shouldModifyManagedTableLocation;
- final boolean shouldMoveExternal;
- final Path targetPath;
- final HadoopShims.HdfsEncryptionShim encryptionShim;
- final HadoopShims.HdfsErasureCodingShim ecShim;
-
- WarehouseRootCheckResult(
- boolean shouldModifyManagedTableLocation,
- boolean shouldMoveExternal,
- Path curWhRootPath,
- HadoopShims.HdfsEncryptionShim encryptionShim,
- HadoopShims.HdfsErasureCodingShim ecShim) {
- this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation;
- this.shouldMoveExternal = shouldMoveExternal;
- this.targetPath = curWhRootPath;
- this.encryptionShim = encryptionShim;
- this.ecShim = ecShim;
- }
- }
-
- public static void main(String[] args) throws Exception {
- RunOptions runOptions;
- RC = 0;
-
- try {
- Options opts = createOptions();
- CommandLine cli = new GnuParser().parse(opts, args);
-
- if (cli.hasOption('h')) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(HiveStrictManagedMigration.class.getName(), opts);
- return;
- }
-
- runOptions = createRunOptions(cli);
- } catch (Exception err) {
- throw new Exception("Error processing options", err);
- }
-
- HiveStrictManagedMigration migration = null;
- try {
- HiveConf conf = hiveConf == null ? new HiveConf() : hiveConf;
- WarehouseRootCheckResult warehouseRootCheckResult = checkOldWarehouseRoot(runOptions, conf);
- runOptions.setShouldModifyManagedTableLocation(
- warehouseRootCheckResult.shouldModifyManagedTableLocation);
- runOptions.setShouldMoveExternal(
- warehouseRootCheckResult.shouldMoveExternal);
- boolean createExternalDirsForDbs = checkExternalWarehouseDir(conf);
- OwnerPermsOptions ownerPermsOptions = checkOwnerPermsOptions(runOptions, conf);
-
- migration = new HiveStrictManagedMigration(
- conf, runOptions, createExternalDirsForDbs, ownerPermsOptions, warehouseRootCheckResult);
- migration.run();
- } catch (Exception err) {
- LOG.error("Failed with error", err);
- RC = -1;
- } finally {
- if (migration != null) {
- migration.cleanup();
- }
- }
-
- // TODO: Something is preventing the process from terminating after main(), adding exit() as hacky solution.
- if (hiveConf == null) {
- System.exit(RC);
- }
- }
-
- static Options createOptions() {
- Options result = new Options();
-
- // -hiveconf x=y
- result.addOption(OptionBuilder
- .withValueSeparator()
- .hasArgs(2)
- .withArgName("property=value")
- .withLongOpt("hiveconf")
- .withDescription("Use value for given property")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("dryRun")
- .withDescription("Show what migration actions would be taken without actually running commands")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("dbRegex")
- .withDescription("Regular expression to match database names on which this tool will be run")
- .hasArg()
- .create('d'));
-
- result.addOption(OptionBuilder
- .withLongOpt("tableRegex")
- .withDescription("Regular expression to match table names on which this tool will be run")
- .hasArg()
- .create('t'));
-
- result.addOption(OptionBuilder
- .withLongOpt("oldWarehouseRoot")
- .withDescription("Location of the previous warehouse root")
- .hasArg()
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("migrationOption")
- .withDescription("Table migration option (automatic|external|managed|validate|none)")
- .hasArg()
- .create('m'));
-
- result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTableLocation")
- .withDescription("Whether managed tables should have their data moved from " +
- "the old warehouse path to the current warehouse path")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTableOwner")
- .withDescription("Whether managed tables should have their directory owners changed to the hive user")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("shouldModifyManagedTablePermissions")
- .withDescription("Whether managed tables should have their directory permissions changed to conform to " +
- "strict managed tables mode")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("modifyManagedTables")
- .withDescription("This setting enables the shouldModifyManagedTableLocation, " +
- "shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("shouldMoveExternal")
- .withDescription("Whether tables living in the old warehouse path should have their data moved to the" +
- " default external location. Applicable only if migrationOption = external")
- .create());
-
- result.addOption(OptionBuilder
- .withLongOpt("help")
- .withDescription("print help message")
- .create('h'));
-
- result.addOption(OptionBuilder
- .withLongOpt("tablePoolSize")
- .withDescription("Number of threads to process tables.")
- .hasArg()
- .create("tn"));
-
- result.addOption(OptionBuilder
- .withLongOpt("tableType")
- .withDescription(String.format("Table type to match tables on which this tool will be run. " +
- "Possible values: %s Default: all tables",
- Arrays.stream(TableType.values()).map(Enum::name).collect(Collectors.joining("|"))))
- .hasArg()
- .withArgName("table type")
- .create("tt"));
-
- result.addOption(OptionBuilder
- .withLongOpt("fsOperationUser")
- .withDescription("If set, migration tool will impersonate this user to carry out write operations on file " +
- "system. Useful e.g. if this tool is run as hive, but chown-ing is also a requirement." +
- "If this is unset file operations will be run in the name of the user running this process (or kinit'ed " +
- "user in Kerberos environments)")
- .hasArg()
- .create());
-
- return result;
- }
-
- static RunOptions createRunOptions(CommandLine cli) throws Exception {
- // Process --hiveconf
- // Get hiveconf param values and set the System property values
- Properties confProps = cli.getOptionProperties("hiveconf");
- for (String propKey : confProps.stringPropertyNames()) {
- LOG.info("Setting {}={}", propKey, confProps.getProperty(propKey));
- if (propKey.equalsIgnoreCase("hive.root.logger")) {
- // TODO: logging currently goes to hive.log
- CommonCliOptions.splitAndSetLogger(propKey, confProps);
- } else {
- System.setProperty(propKey, confProps.getProperty(propKey));
- }
- }
-
- LogUtils.initHiveLog4j();
-
- String dbRegex = cli.getOptionValue("dbRegex", ".*");
- String tableRegex = cli.getOptionValue("tableRegex", ".*");
- TableMigrationOption migrationOption =
- TableMigrationOption.valueOf(cli.getOptionValue("migrationOption", "none").toUpperCase());
- boolean shouldModifyManagedTableLocation = cli.hasOption("shouldModifyManagedTableLocation");
- boolean shouldModifyManagedTableOwner = cli.hasOption("shouldModifyManagedTableOwner");
- boolean shouldModifyManagedTablePermissions = cli.hasOption("shouldModifyManagedTablePermissions");
- if (cli.hasOption("modifyManagedTables")) {
- shouldModifyManagedTableLocation = true;
- shouldModifyManagedTableOwner = true;
- shouldModifyManagedTablePermissions = true;
- }
- String oldWarehouseRoot = cli.getOptionValue("oldWarehouseRoot");
- boolean shouldMoveExternal = cli.hasOption("shouldMoveExternal");
- if (shouldMoveExternal && !migrationOption.equals(TableMigrationOption.EXTERNAL)) {
- throw new IllegalArgumentException("Please select external as migration option, it is required for " +
- "shouldMoveExternal option.");
- }
- if (shouldModifyManagedTableLocation && shouldMoveExternal) {
- throw new IllegalArgumentException("Options shouldModifyManagedTableLocation and " +
- "shouldMoveExternal cannot be used at the same time. Migration with move option on " +
- " managed tables either ends up with them remaining managed or converted to external, but can't be both.");
- }
- boolean dryRun = cli.hasOption("dryRun");
-
- String fsOperationUser = cli.getOptionValue("fsOperationUser");
-
- String tableTypeText = cli.getOptionValue("tableType");
-
- int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2;
- if (defaultPoolSize < 1) {
- defaultPoolSize = 1;
- }
-
- int databasePoolSize = getIntOptionValue(cli, "databasePoolSize", defaultPoolSize);
- if (databasePoolSize < 1) {
- throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize");
- }
- int tablePoolSize = getIntOptionValue(cli, "tablePoolSize", defaultPoolSize);
- if (tablePoolSize < 1) {
- throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize");
- }
-
- RunOptions runOpts = new RunOptions(
- dbRegex,
- tableRegex,
- oldWarehouseRoot,
- migrationOption,
- confProps,
- shouldModifyManagedTableLocation,
- shouldModifyManagedTableOwner,
- shouldModifyManagedTablePermissions,
- shouldMoveExternal,
- dryRun,
- tableTypeText == null ? null : TableType.valueOf(tableTypeText),
- tablePoolSize,
- fsOperationUser);
- return runOpts;
- }
-
- private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) {
- if (commandLine.hasOption(optionName)) {
- try {
- return Integer.parseInt(commandLine.getOptionValue(optionName));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e);
- }
- }
- return defaultValue;
- }
-
- private final HiveConf conf;
- private final RunOptions runOptions;
- private final boolean createExternalDirsForDbs;
- private final Path targetPath;
- private final HadoopShims.HdfsEncryptionShim encryptionShim;
- private final HadoopShims.HdfsErasureCodingShim ecShim;
- private final String ownerName;
- private final String groupName;
- private final FsPermission dirPerms;
- private final FsPermission filePerms;
- private final UserGroupInformation fsOperationUser;
-
- private CloseableThreadLocal<HiveMetaStoreClient> hms;
- private ThreadLocal<Warehouse> wh;
- private ThreadLocal<Warehouse> oldWh;
- private CloseableThreadLocal<HiveUpdater> hiveUpdater;
-
- private AtomicBoolean failuresEncountered;
- private AtomicBoolean failedValidationChecks;
-
- HiveStrictManagedMigration(HiveConf conf, RunOptions runOptions, boolean createExternalDirsForDbs,
- OwnerPermsOptions ownerPermsOptions, WarehouseRootCheckResult warehouseRootCheckResult) {
- this.conf = conf;
- this.runOptions = runOptions;
- this.createExternalDirsForDbs = createExternalDirsForDbs;
- this.ownerName = ownerPermsOptions.ownerName;
- this.groupName = ownerPermsOptions.groupName;
- this.dirPerms = ownerPermsOptions.dirPerms;
- this.filePerms = ownerPermsOptions.filePerms;
- this.targetPath = warehouseRootCheckResult.targetPath;
- this.encryptionShim = warehouseRootCheckResult.encryptionShim;
- this.ecShim = warehouseRootCheckResult.ecShim;
-
- // Make sure all --hiveconf settings get added to the HiveConf.
- // This allows utility-specific settings (such as strict.managed.tables.migration.owner)
- // to be set via command line.
- if (runOptions.confProps != null) {
- for (String propKey : runOptions.confProps.stringPropertyNames()) {
- this.conf.set(propKey, runOptions.confProps.getProperty(propKey));
- }
- }
-
- try {
- if (runOptions.fsOperationUser != null) {
- fsOperationUser = UserGroupInformation.createProxyUser(runOptions.fsOperationUser,
- UserGroupInformation.getLoginUser());
- } else {
- fsOperationUser = UserGroupInformation.getLoginUser();
- }
- } catch (IOException e) {
- throw new RuntimeException("Error while setting up UGI for FS operations.");
- }
-
- this.hms = new CloseableThreadLocal<>(() -> {
- try {
- HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(conf);
- if (hiveConf != null) {
- SessionState ss = SessionState.start(conf);
- ss.applyAuthorizationPolicy();
- }
- return hiveMetaStoreClient;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, runOptions.tablePoolSize);
- wh = ThreadLocal.withInitial(() -> {
- try {
- return new Warehouse(conf);
- } catch (MetaException e) {
- throw new RuntimeException(e);
- }
- });
- if (runOptions.shouldModifyManagedTableLocation || runOptions.shouldMoveExternal) {
- Configuration oldConf = new Configuration(conf);
- HiveConf.setVar(oldConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot);
-
- oldWh = ThreadLocal.withInitial(() -> {
- try {
- return new Warehouse(oldConf);
- } catch (MetaException e) {
- throw new RuntimeException(e);
- }
- });
- }
- this.hiveUpdater = new CloseableThreadLocal<>(() -> {
- try {
- return new HiveUpdater(conf, true);
- } catch (HiveException e) {
- throw new RuntimeException(e);
- }
- }, runOptions.tablePoolSize);
-
- this.failuresEncountered = new AtomicBoolean(false);
- this.failedValidationChecks = new AtomicBoolean(false);
- }
-
- void run() throws Exception {
- LOG.info("Starting with {}", runOptions);
-
- List<String> databases = hms.get().getDatabases(runOptions.dbRegex); //TException
- LOG.info("Found {} databases", databases.size());
- ForkJoinPool tablePool = new ForkJoinPool(
- runOptions.tablePoolSize,
- new NamedForkJoinWorkerThreadFactory("Table-"),
- getUncaughtExceptionHandler(),
- false);
- databases.forEach(dbName -> processDatabase(dbName, tablePool));
- LOG.info("Done processing databases.");
-
- if (failuresEncountered.get()) {
- throw new HiveException("One or more failures encountered during processing.");
- }
- if (failedValidationChecks.get()) {
- throw new HiveException("One or more tables failed validation checks for strict managed table mode.");
- }
- }
-
- private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
- return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e);
- }
-
- static WarehouseRootCheckResult checkOldWarehouseRoot(RunOptions runOptions, HiveConf conf) throws IOException {
- boolean shouldModifyManagedTableLocation = runOptions.shouldModifyManagedTableLocation;
- boolean shouldMoveExternal = runOptions.shouldMoveExternal;
- Path targetPath = null;
- HadoopShims.HdfsEncryptionShim encryptionShim = null;
- HadoopShims.HdfsErasureCodingShim ecShim = null;
-
- if (shouldMoveExternal && !checkExternalWarehouseDir(conf)) {
- LOG.info("External warehouse path not specified/empty. Disabling shouldMoveExternal");
- shouldMoveExternal = false;
- }
-
- if (shouldModifyManagedTableLocation || shouldMoveExternal) {
- if (runOptions.oldWarehouseRoot == null) {
- LOG.info("oldWarehouseRoot is not specified. Disabling shouldModifyManagedTableLocation and " +
- "shouldMoveExternal");
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- } else {
- String currentPathString = shouldModifyManagedTableLocation ?
- HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE) :
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL);
- if (arePathsEqual(conf, runOptions.oldWarehouseRoot, currentPathString)) {
- LOG.info("oldWarehouseRoot is the same as the target path {}."
- + " Disabling shouldModifyManagedTableLocation and shouldMoveExternal",
- runOptions.oldWarehouseRoot);
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- } else {
- Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot);
- targetPath = new Path(currentPathString);
- FileSystem oldWhRootFs = oldWhRootPath.getFileSystem(conf);
- FileSystem curWhRootFs = targetPath.getFileSystem(conf);
- oldWhRootPath = oldWhRootFs.makeQualified(oldWhRootPath);
- targetPath = curWhRootFs.makeQualified(targetPath);
- if (!FileUtils.equalsFileSystem(oldWhRootFs, curWhRootFs)) {
- LOG.info("oldWarehouseRoot {} has a different FS than the target path {}."
- + " Disabling shouldModifyManagedTableLocation and shouldMoveExternal",
- runOptions.oldWarehouseRoot, currentPathString);
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- } else {
- if (!isHdfs(oldWhRootFs)) {
- LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling shouldModifyManagedTableLocation and" +
- "shouldMoveExternal", oldWhRootFs.getUri());
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- } else {
- encryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf);
- if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, targetPath)) {
- LOG.info("oldWarehouseRoot {} and target path {} have different encryption zones." +
- " Disabling shouldModifyManagedTableLocation and shouldMoveExternal",
- oldWhRootPath, targetPath);
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- } else {
- ecShim = ShimLoader.getHadoopShims().createHdfsErasureCodingShim(oldWhRootFs, conf);
- if (!hasEquivalentErasureCodingPolicy(ecShim, oldWhRootPath, targetPath)) {
- LOG.info("oldWarehouseRoot {} and target path {} have different erasure coding policies." +
- " Disabling shouldModifyManagedTableLocation and shouldMoveExternal",
- oldWhRootPath, targetPath);
- shouldModifyManagedTableLocation = false;
- shouldMoveExternal = false;
- }
- }
- }
- }
- }
- }
- }
-
- return new WarehouseRootCheckResult(shouldModifyManagedTableLocation, shouldMoveExternal,
- targetPath, encryptionShim, ecShim);
- }
-
- static OwnerPermsOptions checkOwnerPermsOptions(RunOptions runOptions, HiveConf conf) {
- String ownerName = null;
- String groupName = null;
- FsPermission dirPerms = null;
- FsPermission filePerms = null;
-
- if (runOptions.shouldModifyManagedTableOwner) {
- ownerName = conf.get("strict.managed.tables.migration.owner", "hive");
- groupName = conf.get("strict.managed.tables.migration.group", null);
- }
- if (runOptions.shouldModifyManagedTablePermissions) {
- String dirPermsString = conf.get("strict.managed.tables.migration.dir.permissions", "700");
- if (dirPermsString != null) {
- dirPerms = new FsPermission(dirPermsString);
- }
- String filePermsString = conf.get("strict.managed.tables.migration.file.permissions", "700");
- if (filePermsString != null) {
- filePerms = new FsPermission(filePermsString);
- }
- }
-
- return new OwnerPermsOptions(ownerName, groupName, dirPerms, filePerms);
- }
-
- static boolean checkExternalWarehouseDir(HiveConf conf) {
- String externalWarehouseDir = conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL);
- return externalWarehouseDir != null && !externalWarehouseDir.isEmpty();
- }
-
- void processDatabase(String dbName, ForkJoinPool tablePool) {
- try {
- LOG.info("Processing database {}", dbName);
- Database dbObj = hms.get().getDatabase(dbName);
-
- if (createExternalDirsForDbs) {
- createExternalDbDir(dbObj);
- }
-
- boolean modifyLocation = shouldModifyDatabaseLocation(dbObj);
-
- if (modifyLocation) {
- Path newDefaultDbLocation = getDefaultDbPathManagedOrExternal(dbName);
-
- LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation);
- if (!runOptions.dryRun) {
- FileSystem fs = getFS(newDefaultDbLocation, conf, fsOperationUser);
- FileUtils.mkdir(fs, newDefaultDbLocation, conf);
- // Set appropriate owner/perms of the DB dir only, no need to recurse
- checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation,
- ownerName, groupName, dirPerms, null, runOptions.dryRun, false);
- }
- }
-
- List<String> tableNames;
- if (runOptions.tableType == null) {
- tableNames = hms.get().getTables(dbName, runOptions.tableRegex);
- LOG.debug("found {} tables in {}", tableNames.size(), dbName);
- } else {
- tableNames = hms.get().getTables(dbName, runOptions.tableRegex, runOptions.tableType);
- LOG.debug("found {} {}s in {}", tableNames.size(), runOptions.tableType.name(), dbName);
- }
-
- boolean errorsInThisDb = !tablePool.submit(() -> tableNames.parallelStream()
- .map(tableName -> processTable(dbObj, tableName, modifyLocation))
- .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2)).get();
- if (errorsInThisDb) {
- failuresEncountered.set(true);
- }
-
- // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB.
- if (modifyLocation) {
- if (errorsInThisDb) {
- LOG.error("Not updating database location for {} since an error was encountered. " +
- "The migration must be run again for this database.", dbObj.getName());
- } else {
- if (!runOptions.dryRun) {
- Path newDefaultDbLocation = getDefaultDbPathManagedOrExternal(dbName);
- // dbObj after this call would have the new DB location.
- // Keep that in mind if anything below this requires the old DB path.
- hiveUpdater.get().updateDbLocation(dbObj, newDefaultDbLocation);
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.info("Cancel processing " + dbName, e);
- } catch (TException | IOException | HiveException | ExecutionException ex) {
- LOG.error("Error processing database " + dbName, ex);
- failuresEncountered.set(true);
- }
- }
-
- private Path getDefaultDbPathManagedOrExternal(String dbName) throws MetaException {
- return runOptions.shouldMoveExternal ?
- wh.get().getDefaultExternalDatabasePath(dbName) :
- wh.get().getDefaultDatabasePath(dbName);
- }
-
- public static boolean migrateTable(Table tableObj, TableType tableType, TableMigrationOption migrationOption,
- boolean dryRun, HiveUpdater hiveUpdater, IMetaStoreClient hms, Configuration conf)
- throws HiveException, IOException, TException {
- switch (migrationOption) {
- case EXTERNAL:
- migrateToExternalTable(tableObj, tableType, dryRun, hiveUpdater);
- break;
- case MANAGED:
- migrateToManagedTable(tableObj, tableType, dryRun, hiveUpdater, hms, conf);
- break;
- case NONE:
- break;
- case VALIDATE:
- // Check that the table is valid under strict managed tables mode.
- String reason = HiveStrictManagedUtils.validateStrictManagedTable(conf, tableObj);
- if (reason != null) {
- LOG.warn(reason);
- return true;
- }
- break;
- default:
- throw new IllegalArgumentException("Unexpected table migration option " + migrationOption);
- }
- return false;
- }
-
- boolean processTable(Database dbObj, String tableName, boolean modifyLocation) {
- try {
- String dbName = dbObj.getName();
- LOG.debug("Processing table {}", getQualifiedName(dbName, tableName));
-
- Table tableObj = hms.get().getTable(dbName, tableName);
- TableType tableType = TableType.valueOf(tableObj.getTableType());
-
- TableMigrationOption migrationOption = runOptions.migrationOption;
- if (migrationOption == TableMigrationOption.AUTOMATIC) {
- migrationOption = determineMigrationTypeAutomatically(
- tableObj, tableType, ownerName, conf, hms.get(), null);
- }
-
- boolean failedValidationCheck = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun,
- hiveUpdater.get(), hms.get(), conf);
-
- if (failedValidationCheck) {
- this.failedValidationChecks.set(true);
- return true;
- }
-
- String tablePathString = tableObj.getSd().getLocation();
- if (StringUtils.isEmpty(tablePathString)) {
- // When using this tool in full automatic mode (no DB/table regexes and automatic migration option) we may
- // encounter sysdb / information_schema databases. These should not be moved, they have null location.
- return true;
- }
- Path tablePath = new Path(tablePathString);
-
- boolean shouldMoveTable = modifyLocation && (
- (MANAGED_TABLE.name().equals(tableObj.getTableType()) && runOptions.shouldModifyManagedTableLocation) ||
- (EXTERNAL_TABLE.name().equals(tableObj.getTableType()) && runOptions.shouldMoveExternal));
-
- if (shouldMoveTable && shouldModifyTableLocation(dbObj, tableObj)) {
- Path newTablePath = wh.get().getDnsPath(
- new Path(getDefaultDbPathManagedOrExternal(dbName),
- MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
- moveTableData(dbObj, tableObj, newTablePath);
- if (!runOptions.dryRun) {
- // File ownership/permission checks should be done on the new table path.
- tablePath = newTablePath;
- }
- }
-
- if (MANAGED_TABLE.equals(tableType)) {
- if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) {
- FileSystem fs = tablePath.getFileSystem(conf);
- if (isHdfs(fs)) {
- // TODO: what about partitions not in the default location?
- checkAndSetFileOwnerPermissions(fs, tablePath,
- ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true);
- }
- }
- }
- } catch (Exception ex) {
- LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), ex);
- return false;
- }
- return true;
- }
-
- boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaException {
- String dbName = dbObj.getName();
- if (runOptions.shouldModifyManagedTableLocation || runOptions.shouldMoveExternal) {
- // Check if the database location is in the default location based on the old warehouse root.
- // If so then change the database location to the default based on the current warehouse root.
- String dbLocation = dbObj.getLocationUri();
- Path oldDefaultDbLocation = oldWh.get().getDefaultDatabasePath(dbName);
- if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) {
- if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, targetPath)) {
- if (hasEquivalentErasureCodingPolicy(ecShim, oldDefaultDbLocation, targetPath)) {
- return true;
- } else {
- LOG.info("{} and {} have different EC policies. Will not change database location for {}",
- oldDefaultDbLocation, targetPath, dbName);
- }
- } else {
- LOG.info("{} and {} are on different encryption zones. Will not change database location for {}",
- oldDefaultDbLocation, targetPath, dbName);
- }
- }
- }
- return false;
- }
-
- boolean shouldModifyTableLocation(Database dbObj, Table tableObj) throws IOException, MetaException {
- // Should only be managed tables passed in here.
- // Check if table is in the default table location based on the old warehouse root.
- // If so then change the table location to the default based on the current warehouse root.
- // The existing table directory will also be moved to the new default database directory.
- String tableLocation = tableObj.getSd().getLocation();
- Path oldDefaultTableLocation = oldWh.get().getDefaultTablePath(dbObj, tableObj.getTableName());
- if (arePathsEqual(conf, tableLocation, oldDefaultTableLocation.toString())) {
- if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, targetPath)) {
- if (hasEquivalentErasureCodingPolicy(ecShim, oldDefaultTableLocation, targetPath)) {
- return true;
- } else {
- LOG.info("{} and {} have different EC policies. Will not change table location for {}",
- oldDefaultTableLocation, targetPath, getQualifiedName(tableObj));
- }
- } else {
- LOG.info("{} and {} are on different encryption zones. Will not change table location for {}",
- oldDefaultTableLocation, targetPath, getQualifiedName(tableObj));
- }
- }
- return false;
- }
-
- boolean shouldModifyPartitionLocation(Database dbObj, Table tableObj, Partition partObj,
- Map<String, String> partSpec) throws IOException, MetaException {
- String partLocation = partObj.getSd().getLocation();
- Path oldDefaultPartLocation = runOptions.shouldMoveExternal ?
- oldWh.get().getPartitionPath(dbObj, tableObj, partSpec.values().stream().collect(toList())):
- oldWh.get().getDefaultPartitionPath(dbObj, tableObj, partSpec);
- if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) {
- // No need to check encryption zone and EC policy. Data was moved already along with the whole table.
- return true;
- }
- return false;
- }
-
- void createExternalDbDir(Database dbObj) throws IOException, MetaException {
- Path externalTableDbPath = wh.get().getDefaultExternalDatabasePath(dbObj.getName());
- FileSystem fs = getFS(externalTableDbPath, conf, fsOperationUser);
- if (!fs.exists(externalTableDbPath)) {
- String dbOwner = ownerName;
- String dbGroup = null;
-
- String dbOwnerName = dbObj.getOwnerName();
- if (dbOwnerName != null && !dbOwnerName.isEmpty()) {
- switch (dbObj.getOwnerType()) {
- case USER:
- dbOwner = dbOwnerName;
- break;
- case ROLE:
- break;
- case GROUP:
- dbGroup = dbOwnerName;
- break;
- }
- }
-
- if (dbOwner == null) {
- dbOwner = conf.get("strict.managed.tables.migration.owner", "hive");
- }
- LOG.info("Creating external table directory for database {} at {} with ownership {}/{}",
- dbObj.getName(), externalTableDbPath, dbOwner, dbGroup);
- if (!runOptions.dryRun) {
- // Just rely on parent perms/umask for permissions.
- fs.mkdirs(externalTableDbPath);
- checkAndSetFileOwnerPermissions(fs, externalTableDbPath, dbOwner, dbGroup,
- null, null, runOptions.dryRun, false);
- }
- } else {
- LOG.info("Not creating external table directory for database {} - {} already exists.",
- dbObj.getName(), externalTableDbPath);
- // Leave the directory owner/perms as-is if the path already exists.
- }
- }
-
- void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws HiveException, IOException, TException {
- String dbName = tableObj.getDbName();
- String tableName = tableObj.getTableName();
-
- Path oldTablePath = new Path(tableObj.getSd().getLocation());
-
- LOG.info("Moving location of {} from {} to {}", getQualifiedName(tableObj), oldTablePath, newTablePath);
-
- // Move table directory.
- if (!runOptions.dryRun) {
- FileSystem fs = getFS(newTablePath, conf, fsOperationUser);
- if (fs.exists(oldTablePath)) {
- boolean movedData = fs.rename(oldTablePath, newTablePath);
- if (!movedData) {
- String msg = String.format("Unable to move data directory for table %s from %s to %s",
- getQualifiedName(tableObj), oldTablePath, newTablePath);
- throw new HiveException(msg);
- }
- }
- }
-
- // An error occurring between here and before updating the table's location in the metastore
- // may potentially cause the data to reside in the new location, while the
- // table/partitions point to the old paths.
- // The migration would be _REQUIRED_ to run again (and pass) for the data and table/partition
- // locations to be in sync.
-
- if (isPartitionedTable(tableObj)) {
- List<String> partNames = hms.get().listPartitionNames(dbName, tableName, Short.MAX_VALUE);
- // TODO: Fetch partitions in batches?
- // TODO: Threadpool to process partitions?
- for (String partName : partNames) {
- Partition partObj = hms.get().getPartition(dbName, tableName, partName);
- Map<String, String> partSpec =
- Warehouse.makeSpecFromValues(tableObj.getPartitionKeys(), partObj.getValues());
- if (shouldModifyPartitionLocation(dbObj, tableObj, partObj, partSpec)) {
- // Table directory (which includes the partition directory) has already been moved,
- // just update the partition location in the metastore.
- if (!runOptions.dryRun) {
- Path newPartPath = wh.get().getPartitionPath(newTablePath, partSpec);
- hiveUpdater.get().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath);
- }
- }
- }
- }
-
- // Finally update the table location. This would prevent this tool from processing this table again
- // on subsequent runs of the migration.
- if (!runOptions.dryRun) {
- hiveUpdater.get().updateTableLocation(tableObj, newTablePath);
- }
- }
-
- static void renameFilesToConformToAcid(Table tableObj, IMetaStoreClient hms, Configuration conf, boolean dryRun)
- throws IOException, TException {
- if (isPartitionedTable(tableObj)) {
- String dbName = tableObj.getDbName();
- String tableName = tableObj.getTableName();
- List<String> partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE);
- for (String partName : partNames) {
- Partition partObj = hms.getPartition(dbName, tableName, partName);
- Path partPath = new Path(partObj.getSd().getLocation());
- FileSystem fs = partPath.getFileSystem(conf);
- if (fs.exists(partPath)) {
- UpgradeTool.handleRenameFiles(tableObj, partPath,
- !dryRun, conf, tableObj.getSd().getBucketColsSize() > 0, null);
- }
- }
- } else {
- Path tablePath = new Path(tableObj.getSd().getLocation());
- FileSystem fs = tablePath.getFileSystem(conf);
- if (fs.exists(tablePath)) {
- UpgradeTool.handleRenameFiles(tableObj, tablePath,
- !dryRun, conf, tableObj.getSd().getBucketColsSize() > 0, null);
- }
- }
- }
-
- public static TableMigrationOption determineMigrationTypeAutomatically(Table tableObj, TableType tableType,
- String ownerName, Configuration conf, IMetaStoreClient hms, Boolean isPathOwnedByHive)
- throws IOException, MetaException, TException {
- TableMigrationOption result = TableMigrationOption.NONE;
- String msg;
- switch (tableType) {
- case MANAGED_TABLE:
- if (AcidUtils.isTransactionalTable(tableObj)) {
- // Always keep transactional tables as managed tables.
- result = TableMigrationOption.MANAGED;
- } else {
- String reason = shouldTableBeExternal(tableObj, ownerName, conf, hms, isPathOwnedByHive);
- if (reason != null) {
- LOG.debug("Converting {} to external table. {}", getQualifiedName(tableObj), reason);
- result = TableMigrationOption.EXTERNAL;
- } else {
- result = TableMigrationOption.MANAGED;
- }
- }
- break;
- case EXTERNAL_TABLE:
- msg = String.format("Table %s is already an external table, not processing.",
- getQualifiedName(tableObj));
- LOG.debug(msg);
- result = TableMigrationOption.NONE;
- break;
- default: // VIEW/MATERIALIZED_VIEW
- msg = String.format("Ignoring table %s because it has table type %s",
- getQualifiedName(tableObj), tableType);
- LOG.debug(msg);
- result = TableMigrationOption.NONE;
- break;
- }
-
- return result;
- }
-
- private static final Map<String, String> convertToExternalTableProps = new HashMap<>();
- private static final Map<String, String> convertToAcidTableProps = new HashMap<>();
- private static final Map<String, String> convertToMMTableProps = new HashMap<>();
- private static final String KUDU_LEGACY_STORAGE_HANDLER = "com.cloudera.kudu.hive.KuduStorageHandler";
- private static final String KUDU_STORAGE_HANDLER = "org.apache.hadoop.hive.kudu.KuduStorageHandler";
-
- static {
- convertToExternalTableProps.put("EXTERNAL", "TRUE");
- convertToExternalTableProps.put("external.table.purge", "true");
-
- convertToAcidTableProps.put("transactional", "true");
-
- convertToMMTableProps.put("transactional", "true");
- convertToMMTableProps.put("transactional_properties", "insert_only");
- }
-
- static boolean migrateToExternalTable(Table tableObj, TableType tableType, boolean dryRun, HiveUpdater hiveUpdater)
- throws HiveException {
- String msg;
- switch (tableType) {
- case MANAGED_TABLE:
- if (AcidUtils.isTransactionalTable(tableObj)) {
- msg = createExternalConversionExcuse(tableObj,
- "Table is a transactional table");
- LOG.debug(msg);
- return false;
- }
- LOG.info("Converting {} to external table ...", getQualifiedName(tableObj));
- if (!dryRun) {
- tableObj.setTableType(EXTERNAL_TABLE.toString());
- hiveUpdater.updateTableProperties(tableObj, convertToExternalTableProps);
- }
- return true;
- case EXTERNAL_TABLE:
- // Might need to update storage_handler
- hiveUpdater.updateTableProperties(tableObj, new HashMap<>());
- msg = createExternalConversionExcuse(tableObj,
- "Table is already an external table");
- LOG.debug(msg);
- break;
- default: // VIEW/MATERIALIZED_VIEW
- msg = createExternalConversionExcuse(tableObj,
- "Table type " + tableType + " cannot be converted");
- LOG.debug(msg);
- break;
- }
- return false;
- }
-
- static boolean canTableBeFullAcid(Table tableObj) throws MetaException {
- // Table must be acid-compatible table format, and no sorting columns.
- return TransactionalValidationListener.conformToAcid(tableObj) &&
- (tableObj.getSd().getSortColsSize() <= 0);
- }
-
- static Map<String, String> getTablePropsForConversionToTransactional(Map<String, String> props,
- boolean convertFromExternal) {
- if (convertFromExternal) {
- // Copy the properties to a new map so we can add EXTERNAL=FALSE
- props = new HashMap<String, String>(props);
- props.put("EXTERNAL", "FALSE");
- }
- return props;
- }
-
- static boolean migrateToManagedTable(Table tableObj, TableType tableType, boolean dryRun, HiveUpdater hiveUpdater,
- IMetaStoreClient hms, Configuration conf)
- throws HiveException, IOException, MetaException, TException {
-
- boolean convertFromExternal = false;
- switch (tableType) {
- case EXTERNAL_TABLE:
- convertFromExternal = true;
- // fall through
- case MANAGED_TABLE:
- if (MetaStoreUtils.isNonNativeTable(tableObj)) {
- String msg = createManagedConversionExcuse(tableObj,
- "Table is a non-native (StorageHandler) table");
- LOG.debug(msg);
- return false;
- }
- if (HiveStrictManagedUtils.isAvroTableWithExternalSchema(tableObj)) {
- String msg = createManagedConversionExcuse(tableObj,
- "Table is an Avro table with an external schema url");
- LOG.debug(msg);
- return false;
- }
- // List bucketed table cannot be converted to transactional
- if (HiveStrictManagedUtils.isListBucketedTable(tableObj)) {
- String msg = createManagedConversionExcuse(tableObj,
- "Table is a list bucketed table");
- LOG.debug(msg);
- return false;
- }
- // If table is already transactional, no migration needed.
- if (AcidUtils.isTransactionalTable(tableObj)) {
- String msg = createManagedConversionExcuse(tableObj,
- "Table is already a transactional table");
- LOG.debug(msg);
- return false;
- }
-
- // ORC files can be converted to full acid transactional tables
- // Other formats can be converted to insert-only transactional tables
- if (canTableBeFullAcid(tableObj)) {
- // TODO: option to allow converting ORC file to insert-only transactional?
- LOG.info("Converting {} to full transactional table", getQualifiedName(tableObj));
-
- if (hiveUpdater.doFileRename) {
- renameFilesToConformToAcid(tableObj, hms, conf, dryRun);
- }
-
- if (!dryRun) {
- Map<String, String> props = getTablePropsForConversionToTransactional(
- convertToAcidTableProps, convertFromExternal);
- hiveUpdater.updateTableProperties(tableObj, props);
- }
- return true;
- } else {
- LOG.info("Converting {} to insert-only transactional table", getQualifiedName(tableObj));
- if (!dryRun) {
- Map<String, String> props = getTablePropsForConversionToTransactional(
- convertToMMTableProps, convertFromExternal);
- hiveUpdater.updateTableProperties(tableObj, props);
- }
- return true;
- }
- default: // VIEW/MATERIALIZED_VIEW
- String msg = createManagedConversionExcuse(tableObj,
- "Table type " + tableType + " cannot be converted");
- LOG.debug(msg);
- return false;
- }
- }
-
- static String shouldTableBeExternal(Table tableObj, String ownerName, Configuration conf,
- IMetaStoreClient hms, Boolean isPathOwnedByHive)
- throws IOException, MetaException, TException {
- if (MetaStoreUtils.isNonNativeTable(tableObj)) {
- return "Table is a non-native (StorageHandler) table";
- }
- if (HiveStrictManagedUtils.isAvroTableWithExternalSchema(tableObj)) {
- return "Table is an Avro table with an external schema url";
- }
- // List bucketed table cannot be converted to transactional
- if (HiveStrictManagedUtils.isListBucketedTable(tableObj)) {
- return "Table is a list bucketed table";
- }
- // If any table/partition directory is not owned by hive,
- // then assume table is using storage-based auth - set external.
- // Transactional tables should still remain transactional,
- // but we should have already checked for that before this point.
- if (isPathOwnedByHive != null) {
- // for replication flow, the path ownership must be verified at source cluster itself.
- return isPathOwnedByHive ? null :
- String.format("One or more table directories is not owned by hive or non-HDFS path at source cluster");
- } else if (shouldTablePathBeExternal(tableObj, ownerName, conf, hms)) {
- return String.format("One or more table directories not owned by %s, or non-HDFS path", ownerName);
- }
-
- return null;
- }
-
- static boolean shouldTablePathBeExternal(Table tableObj, String ownerName, Configuration conf, IMetaStoreClient hms)
- throws IOException, TException {
- boolean shouldBeExternal = false;
- String dbName = tableObj.getDbName();
- String tableName = tableObj.getTableName();
-
- if (!isPartitionedTable(tableObj)) {
- // Check the table directory.
- Path tablePath = new Path(tableObj.getSd().getLocation());
- FileSystem fs = tablePath.getFileSystem(conf);
- if (isHdfs(fs)) {
- boolean ownedByHive = checkDirectoryOwnership(fs, tablePath, ownerName, true);
- shouldBeExternal = !ownedByHive;
- } else {
- // Set non-hdfs tables to external, unless transactional (should have been checked before this).
- shouldBeExternal = true;
- }
- } else {
- // Check ownership for all partitions
- List<String> partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE);
- for (String partName : partNames) {
- Partition partObj = hms.getPartition(dbName, tableName, partName);
- Path partPath = new Path(partObj.getSd().getLocation());
- FileSystem fs = partPath.getFileSystem(conf);
- if (isHdfs(fs)) {
- boolean ownedByHive = checkDirectoryOwnership(fs, partPath, ownerName, true);
- shouldBeExternal = !ownedByHive;
- } else {
- shouldBeExternal = true;
- }
- if (shouldBeExternal) {
- break;
- }
- }
- }
-
- return shouldBeExternal;
- }
-
- void cleanup() {
- hms.close();
- if (hiveUpdater != null) {
- runAndLogErrors(() -> hiveUpdater.close());
- hiveUpdater = null;
- }
- }
-
- public static HiveUpdater getHiveUpdater(HiveConf conf) throws HiveException {
- return new HiveUpdater(conf, false);
- }
-
- private static final class TxnCtx {
- public final long writeId;
- public final String validWriteIds;
- public final long txnId;
-
- public TxnCtx(long writeId, String validWriteIds, long txnId) {
- this.writeId = writeId;
- this.txnId = txnId;
- this.validWriteIds = validWriteIds;
- }
- }
-
- private static class HiveUpdater implements AutoCloseable {
- Hive hive;
- boolean doFileRename;
-
- HiveUpdater(HiveConf conf, boolean fileRename) throws HiveException {
- hive = Hive.get(conf);
- Hive.set(hive);
- doFileRename = fileRename;
- }
-
- @Override
- public void close() {
- if (hive != null) {
- runAndLogErrors(Hive::closeCurrent);
- hive = null;
- }
- }
-
- void updateDbLocation(Database db, Path newLocation) throws HiveException {
- String msg = String.format("ALTER DATABASE %s SET LOCATION '%s'", db.getName(), newLocation);
- LOG.info(msg);
-
- db.setLocationUri(newLocation.toString());
- hive.alterDatabase(db.getName(), db);
- }
-
- void updateTableLocation(Table table, Path newLocation) throws HiveException {
- String msg = String.format("ALTER TABLE %s SET LOCATION '%s'",
- getQualifiedName(table), newLocation);
- LOG.info(msg);
- boolean isTxn = TxnUtils.isTransactionalTable(table);
-
- org.apache.hadoop.hive.ql.metadata.Table modifiedTable =
- new org.apache.hadoop.hive.ql.metadata.Table(table);
- modifiedTable.setDataLocation(newLocation);
-
- alterTableInternal(isTxn, table, modifiedTable);
- }
-
- private void alterTableInternal(boolean wasTxn, Table table,
- org.apache.hadoop.hive.ql.metadata.Table modifiedTable) throws HiveException {
- IMetaStoreClient msc = getMSC();
- TxnCtx txnCtx = generateTxnCtxForAlter(table, msc, wasTxn);
- boolean isOk = false;
- try {
- String validWriteIds = null;
- if (txnCtx != null) {
- validWriteIds = txnCtx.validWriteIds;
- modifiedTable.getTTable().setWriteId(txnCtx.writeId);
- }
- msc.alter_table(table.getCatName(), table.getDbName(), table.getTableName(),
- modifiedTable.getTTable(), null, validWriteIds);
- isOk = true;
- } catch (TException ex) {
- throw new HiveException(ex);
- } finally {
- closeTxnCtx(txnCtx, msc, isOk);
- }
- }
-
- private void alterPartitionInternal(Table table,
- org.apache.hadoop.hive.ql.metadata.Partition modifiedPart) throws HiveException {
- IMetaStoreClient msc = getMSC();
- TxnCtx txnCtx = generateTxnCtxForAlter(table, msc, null);
- boolean isOk = false;
- try {
- String validWriteIds = null;
- if (txnCtx != null) {
- validWriteIds = txnCtx.validWriteIds;
- modifiedPart.getTPartition().setWriteId(txnCtx.writeId);
- }
- msc.alter_partition(table.getCatName(), table.getDbName(), table.getTableName(),
- modifiedPart.getTPartition(), null, validWriteIds);
- isOk = true;
- } catch (TException ex) {
- throw new HiveException(ex);
- } finally {
- closeTxnCtx(txnCtx, msc, isOk);
- }
- }
-
- private IMetaStoreClient getMSC() throws HiveException {
- try {
- return hive.getMSC();
- } catch (MetaException ex) {
- throw new HiveException(ex);
- }
- }
-
- private TxnCtx generateTxnCtxForAlter(
- Table table, IMetaStoreClient msc, Boolean wasTxn) throws HiveException {
- if ((wasTxn != null && !wasTxn) || !TxnUtils.isTransactionalTable(table.getParameters())) {
- return null;
- }
- try {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- long txnId = msc.openTxn(ugi == null ? "anonymous" : ugi.getShortUserName());
- TxnCtx result = null;
- try {
- ValidTxnList txns = msc.getValidTxns(txnId);
- String fqn = table.getDbName() + "." + table.getTableName();
- List<TableValidWriteIds> writeIdsObj = msc.getValidWriteIds(
- Lists.newArrayList(fqn), txns.toString());
- String validWriteIds = TxnCommonUtils.createValidTxnWriteIdList(txnId, writeIdsObj)
- .getTableValidWriteIdList(fqn).writeToString();
- long writeId = msc.allocateTableWriteId(txnId, table.getDbName(), table.getTableName());
- result = new TxnCtx(writeId, validWriteIds, txnId);
- } finally {
- if (result == null) {
- msc.abortTxns(Lists.newArrayList(txnId));
- }
- }
- return result;
- } catch (IOException | TException ex) {
- throw new HiveException(ex);
- }
- }
-
- private void closeTxnCtx(TxnCtx txnCtx, IMetaStoreClient msc, boolean isOk)
- throws HiveException {
- if (txnCtx == null) return;
- try {
- if (isOk) {
- msc.commitTxn(txnCtx.txnId);
- } else {
- msc.abortTxns(Lists.newArrayList(txnCtx.txnId));
- }
- } catch (TException ex) {
- throw new HiveException(ex);
- }
- }
-
- void updatePartitionLocation(String dbName, Table table, String partName,
- Partition part, Path newLocation) throws HiveException, TException {
- String msg = String.format("ALTER TABLE %s PARTITION (%s) SET LOCATION '%s'",
- getQualifiedName(table), partName, newLocation.toString());
- LOG.info(msg);
-
- org.apache.hadoop.hive.ql.metadata.Partition modifiedPart =
- new org.apache.hadoop.hive.ql.metadata.Partition(
- new org.apache.hadoop.hive.ql.metadata.Table(table),
- part);
- modifiedPart.setLocation(newLocation.toString());
- alterPartitionInternal(table, modifiedPart);
- }
-
- void updateTableProperties(Table table, Map<String, String> propsToApply) throws HiveException {
- Map<String, String> props = new HashMap<>(propsToApply);
- migrateKuduStorageHandlerType(table, props);
- StringBuilder sb = new StringBuilder();
- boolean isTxn = TxnUtils.isTransactionalTable(table);
- org.apache.hadoop.hive.ql.metadata.Table modifiedTable = doFileRename ?
- new org.apache.hadoop.hive.ql.metadata.Table(table) : null;
- if (props.size() == 0) {
- return;
- }
- boolean first = true;
- for (String key : props.keySet()) {
- String value = props.get(key);
- if (modifiedTable == null) {
- table.getParameters().put(key, value);
- } else {
- modifiedTable.getParameters().put(key, value);
- }
-
- // Build properties list for logging
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append("'");
- sb.append(key);
- sb.append("'='");
- sb.append(value);
- sb.append("'");
- }
- String msg = String.format("ALTER TABLE %s SET TBLPROPERTIES (%s)",
- getQualifiedName(table), sb.toString());
- LOG.info(msg);
-
- // Note: for now, this is always called to convert the table to either external, or ACID/MM,
- // so the original table would be non-txn and the transaction wouldn't be opened.
- if (modifiedTable != null) {
- alterTableInternal(isTxn, table, modifiedTable);
- }
- }
- }
-
- interface ThrowableRunnable {
- void run() throws Exception;
- }
-
- static void runAndLogErrors(ThrowableRunnable r) {
- try {
- r.run();
- } catch (Exception err) {
- LOG.error("Error encountered", err);
- }
- }
-
- static String createExternalConversionExcuse(Table tableObj, String reason) {
- return String.format("Table %s cannot be converted to an external table in "
- + "strict managed table mode for the following reason: %s",
- getQualifiedName(tableObj), reason);
- }
-
- static String createManagedConversionExcuse(Table tableObj, String reason) {
- return String.format("Table %s cannot be converted to a managed table in "
- + "strict managed table mode for the following reason: %s",
- getQualifiedName(tableObj), reason);
- }
-
- static boolean isPartitionedTable(Table tableObj) {
- List<FieldSchema> partKeys = tableObj.getPartitionKeys();
- if (partKeys != null && partKeys.size() > 0) {
- return true;
- }
- return false;
- }
-
- static boolean isHdfs(FileSystem fs) {
- return scheme.equals(fs.getScheme());
- }
-
- static String getQualifiedName(Table tableObj) {
- return getQualifiedName(tableObj.getDbName(), tableObj.getTableName());
- }
-
- static String getQualifiedName(String dbName, String tableName) {
- StringBuilder sb = new StringBuilder();
- sb.append('`');
- sb.append(dbName);
- sb.append("`.`");
- sb.append(tableName);
- sb.append('`');
- return sb.toString();
- }
-
- static boolean arePathsEqual(Configuration conf, String path1, String path2) throws IOException {
- String qualified1 = getQualifiedPath(conf, new Path(path1));
- String qualified2 = getQualifiedPath(conf, new Path(path2));
- return qualified1.equals(qualified2);
- }
-
- static String getQualifiedPath(Configuration conf, Path path) throws IOException {
- FileSystem fs;
- if (path == null) {
- return null;
- }
-
- fs = path.getFileSystem(conf);
- return fs.makeQualified(path).toString();
- }
-
- /**
- * Recursively check the file owner and permissions, setting them to the passed in values
- * if the owner/perms of the file do not match.
- * @param fs
- * @param path
- * @param userName Owner of the file to compare/set. Null to skip this check.
- * @param groupName Group of the file to compare/set. Null to skip this check.
- * @param dirPerms Permissions to compare/set, if the file is a directory. Null to skip this check.
- * @param filePerms Permissions to compare/set, if the file is a file. Null to skip this check.
- * @param dryRun Dry run - check but do not actually set
- * @param recurse Whether to recursively check/set the contents of a directory
- * @throws IOException
- */
- static void checkAndSetFileOwnerPermissions(FileSystem fs, Path path,
- String userName, String groupName,
- FsPermission dirPerms, FsPermission filePerms,
- boolean dryRun, boolean recurse) throws IOException {
- FileStatus fStatus = getFileStatus(fs, path);
- checkAndSetFileOwnerPermissions(fs, fStatus, userName, groupName, dirPerms, filePerms, dryRun, recurse);
- }
-
- /**
- * Recursively check the file owner and permissions, setting them to the passed in values
- * if the owner/perms of the file do not match.
- * @param fs
- * @param fStatus
- * @param userName Owner of the file to compare/set. Null to skip this check.
- * @param groupName Group of the file to compare/set. Null to skip this check.
- * @param dirPerms Permissions to compare/set, if the file is a directory. Null to skip this check.
- * @param filePerms Permissions to compare/set, if the file is a file. Null to skip this check.
- * @param dryRun Dry run - check but do not actually set
- * @param recurse Whether to recursively check/set the contents of a directory
- * @throws IOException
- */
- static void checkAndSetFileOwnerPermissions(FileSystem fs, FileStatus fStatus,
- String userName, String groupName,
- FsPermission dirPerms, FsPermission filePerms,
- boolean dryRun, boolean recurse) throws IOException {
- if (fStatus == null) {
- return;
- }
-
- Path path = fStatus.getPath();
- boolean setOwner = false;
- if (userName != null && !userName.equals(fStatus.getOwner())) {
- setOwner = true;
- } else if (groupName != null && !groupName.equals(fStatus.getGroup())) {
- setOwner = true;
- }
-
- boolean isDir = fStatus.isDirectory();
- boolean setPerms = false;
- FsPermission perms = filePerms;
- if (isDir) {
- perms = dirPerms;
- }
- if (perms != null && !perms.equals(fStatus.getPermission())) {
- setPerms = true;
- }
-
- if (setOwner) {
- LOG.debug("Setting owner/group of {} to {}/{}", path, userName, groupName);
- if (!dryRun) {
- fs.setOwner(path, userName, groupName);
- }
- }
- if (setPerms) {
- LOG.debug("Setting perms of {} to {}", path, perms);
- if (!dryRun) {
- fs.setPermission(path, perms);
- }
- }
-
- if (isDir && recurse) {
- for (FileStatus subFile : fs.listStatus(path)) {
- // TODO: Use threadpool for more concurrency?
- // TODO: check/set all files, or only directories
- checkAndSetFileOwnerPermissions(fs, subFile, userName, groupName, dirPerms, filePerms, dryRun, recurse);
- }
- }
- }
-
- static boolean checkDirectoryOwnership(FileSystem fs,
- Path path,
- String userName,
- boolean recurse) throws IOException {
- FileStatus fStatus = getFileStatus(fs, path);
- return checkDirectoryOwnership(fs, fStatus, userName, recurse);
- }
-
- static boolean checkDirectoryOwnership(FileSystem fs,
- FileStatus fStatus,
- String userName,
- boolean recurse) throws IOException {
- if (fStatus == null) {
- // Non-existent file returns true.
- return true;
- }
-
- Path path = fStatus.getPath();
- boolean result = true;
-
- // Ignore non-directory files
- boolean isDir = fStatus.isDirectory();
- if (isDir) {
- if (userName != null && !userName.equals(fStatus.getOwner())) {
- return false;
- }
-
- if (recurse) {
- for (FileStatus subFile : fs.listStatus(path)) {
- if (!checkDirectoryOwnership(fs, subFile, userName, recurse)) {
- return false;
- }
- }
- }
- }
-
- return result;
- }
-
- static FileStatus getFileStatus(FileSystem fs, Path path) throws IOException {
- if (!fs.exists(path)) {
- return null;
- }
- return fs.getFileStatus(path);
- }
-
- static FileStatus[] listStatus(FileSystem fs, Path path) throws IOException {
- if (!fs.exists(path)) {
- return null;
- }
- return fs.listStatus(path);
- }
-
- static boolean hasEquivalentEncryption(HadoopShims.HdfsEncryptionShim encryptionShim,
- Path path1, Path path2) throws IOException {
- // Assumes these are both qualified paths are in the same FileSystem
- if (encryptionShim.isPathEncrypted(path1) || encryptionShim.isPathEncrypted(path2)) {
- if (!encryptionShim.arePathsOnSameEncryptionZone(path1, path2)) {
- return false;
- }
- }
- return true;
- }
-
- static boolean hasEquivalentErasureCodingPolicy(HadoopShims.HdfsErasureCodingShim ecShim,
- Path path1, Path path2) throws IOException {
- HadoopShims.HdfsFileErasureCodingPolicy policy1 = ecShim.getErasureCodingPolicy(path1);
- HadoopShims.HdfsFileErasureCodingPolicy policy2 = ecShim.getErasureCodingPolicy(path2);
- if (policy1 != null) {
- return policy1.equals(policy2);
- } else {
- if (policy2 == null) {
- return true;
- }
- return false;
- }
- }
-
- /**
- * While upgrading from earlier versions we need to amend storage_handler value for Kudu tables that might
- * have the legacy value set.
- * @param table
- * @param props
- */
- private static void migrateKuduStorageHandlerType(Table table, Map<String, String> props) {
- Map<String, String> tableProperties = table.getParameters();
- if (tableProperties != null) {
- String storageHandler = tableProperties.get(META_TABLE_STORAGE);
- if (KUDU_LEGACY_STORAGE_HANDLER.equals(storageHandler)) {
- props.put(META_TABLE_STORAGE, KUDU_STORAGE_HANDLER);
- }
- }
- }
-
- private static FileSystem getFS(Path path, Configuration conf, UserGroupInformation fsOperationUser)
- throws IOException {
- try {
- return fsOperationUser.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return path.getFileSystem(conf);
- }
- });
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * can set it from tests to test when config needs something other than default values.
- */
- @VisibleForTesting
- static HiveConf hiveConf = null;
- @VisibleForTesting
- static String scheme = "hdfs";
-}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java
index 986e508..9d0e579 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java
@@ -84,7 +84,7 @@ public class TestRetryable {
@Test
public void testRetrySuccessValidExceptionList() throws Throwable {
Retryable retryable = Retryable.builder()
- .withTotalDuration(30)
+ .withTotalDuration(60)
.withInitialDelay(1)
.withBackoff(1.0)
.withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
deleted file mode 100644
index a6862a8..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.util;
-
-import static java.util.stream.Collectors.toSet;
-import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBL;
-import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBLPART;
-import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDNONBUCKET;
-import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL;
-import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL2;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestHiveStrictManagedMigration extends TxnCommandsBaseForTests {
- private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
- File.separator + TestHiveStrictManagedMigration.class.getCanonicalName() + "-" + System.currentTimeMillis()
- ).getPath().replaceAll("\\\\", "/");
- private static final String EXTERNAL_TABLE_LOCATION = new File(TEST_DATA_DIR, "tmp").getPath();
-
- @Test
- public void testUpgrade() throws Exception {
- int[][] data = {{1, 2}, {3, 4}, {5, 6}};
- runStatementOnDriver("DROP TABLE IF EXISTS test.TAcid");
- runStatementOnDriver("DROP DATABASE IF EXISTS test");
-
- runStatementOnDriver("CREATE DATABASE test");
- runStatementOnDriver(
- "CREATE TABLE test.TAcid (a int, b int) CLUSTERED BY (b) INTO 2 BUCKETS STORED AS orc TBLPROPERTIES" +
- " ('transactional'='true')");
- runStatementOnDriver("INSERT INTO test.TAcid" + makeValuesClause(data));
-
- runStatementOnDriver(
- "CREATE EXTERNAL TABLE texternal (a int, b int)");
-
- // Case for table having null location
- runStatementOnDriver("CREATE EXTERNAL TABLE test.sysdbtest(tbl_id bigint)");
- org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(hiveConf).getTable("test", "sysdbtest");
- table.getSd().unsetLocation();
- Hive.get(hiveConf).alterTable(table, false,
- new EnvironmentContext(ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)), false);
-
- String oldWarehouse = getWarehouseDir();
- String[] args = {"--hiveconf", "hive.strict.managed.tables=true", "-m", "automatic", "--modifyManagedTables",
- "--oldWarehouseRoot", oldWarehouse};
- HiveConf newConf = new HiveConf(hiveConf);
- File newWarehouseDir = new File(getTestDataDir(), "newWarehouse");
- newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, newWarehouseDir.getAbsolutePath());
- newConf.set("strict.managed.tables.migration.owner", System.getProperty("user.name"));
- runMigrationTool(newConf, args);
-
- Assert.assertTrue(newWarehouseDir.exists());
- Assert.assertTrue(new File(newWarehouseDir, ACIDTBL.toString().toLowerCase()).exists());
- Assert.assertTrue(new File(newWarehouseDir, ACIDTBLPART.toString().toLowerCase()).exists());
- Assert.assertTrue(new File(newWarehouseDir, NONACIDNONBUCKET.toString().toLowerCase()).exists());
- Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL.toString().toLowerCase()).exists());
- Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL2.toString().toLowerCase()).exists());
- Assert.assertTrue(new File(new File(newWarehouseDir, "test.db"), "tacid").exists());
- Assert.assertTrue(new File(oldWarehouse, "texternal").exists());
-
- // Tear down
- runStatementOnDriver("drop database test cascade");
- Database defaultDb = Hive.get().getDatabase("default");
- defaultDb.setLocationUri(oldWarehouse);
- Hive.get().alterDatabase("default", defaultDb);
- System.setProperty("hive.strict.managed.tables", "false");
- }
-
- /**
- * Tests shouldMoveExternal option on all possible scenarios of the following dimensions:
- * - managed or external table type?
- * - location in (old) warehouse or truly external location?
- * - is partitioned?
- * - is partition location default (under table directory) or custom external path?
- * - default or custom database?
- * @throws Exception
- */
- @Test
- public void testExternalMove() throws Exception {
- setupExternalTableTest();
- String oldWarehouse = getWarehouseDir();
- String[] args = {"-m", "external", "--shouldMoveExternal", "--tableRegex", "man.*|ext.*|custm.*|custe.*",
- "--oldWarehouseRoot", oldWarehouse};
- HiveConf newConf = new HiveConf(hiveConf);
- File newManagedWarehouseDir = new File(getTestDataDir(), "newManaged");
- File newExtWarehouseDir = new File(getTestDataDir(), "newExternal");
- newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, newManagedWarehouseDir.getAbsolutePath());
- newConf.set(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname, newExtWarehouseDir.getAbsolutePath());
- runMigrationTool(newConf, args);
- Assert.assertTrue(newExtWarehouseDir.exists());
- assertExternalTableLocations(newExtWarehouseDir, new File(EXTERNAL_TABLE_LOCATION));
- assertSDLocationCorrect();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testExternalMoveFailsForIncorrectOptions() throws Throwable {
- try {
- String[] args = {"-m", "automatic", "--shouldMoveExternal"};
- runMigrationTool(new HiveConf(hiveConf), args);
- } catch (Exception e) {
- // Exceptions are re-packaged by the migration tool...
- throw e.getCause();
- }
- }
-
- /**
- * Should encounter a DB with an unset owner, and should try to chown the new dir path to 'hive' user.
- * This will always fail in this test, as we're never running it as root.
- * @throws Exception
- */
- @Test(expected = AssertionError.class)
- public void testExtDbDirOnFsIsCreatedAsHiveIfDbOwnerNull() throws Exception {
- runStatementOnDriver("drop database if exists ownerlessdb");
- runStatementOnDriver("create database ownerlessdb");
- Database db = Hive.get().getDatabase("ownerlessdb");
- db.setOwnerName(null);
- Hive.get().alterDatabase("ownerlessdb", db);
-
- String[] args = {"-m", "external"};
- HiveConf newConf = new HiveConf(hiveConf);
- File newExtWarehouseDir = new File(getTestDataDir(), "newExternal");
- newConf.set(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname, newExtWarehouseDir.getAbsolutePath());
- runMigrationTool(newConf, args);
- }
-
- @Override
- protected String getTestDataDir() {
- return TEST_DATA_DIR;
- }
-
-
- private static void runMigrationTool(HiveConf hiveConf, String[] args) throws Exception {
- HiveStrictManagedMigration.hiveConf = hiveConf;
- HiveStrictManagedMigration.scheme = "file";
- HiveStrictManagedMigration.main(args);
- if (HiveStrictManagedMigration.RC != 0) {
- fail("HiveStrictManagedMigration failed with error(s)");
- }
- }
-
- private void setupExternalTableTest() throws Exception {
- runStatementOnDriver("drop table if exists manwhnone");
- runStatementOnDriver("drop table if exists manoutnone");
- runStatementOnDriver("drop table if exists manwhwh");
- runStatementOnDriver("drop table if exists manwhout");
- runStatementOnDriver("drop table if exists manwhmixed");
- runStatementOnDriver("drop table if exists manoutout");
- runStatementOnDriver("drop table if exists extwhnone");
- runStatementOnDriver("drop table if exists extoutnone");
- runStatementOnDriver("drop table if exists extwhwh");
- runStatementOnDriver("drop table if exists extwhout");
- runStatementOnDriver("drop table if exists extwhmixed");
- runStatementOnDriver("drop table if exists extoutout");
- runStatementOnDriver("drop table if exists custdb.custmanwhwh");
- runStatementOnDriver("drop table if exists custdb.custextwhwh");
- runStatementOnDriver("create table manwhnone (a string)");
- runStatementOnDriver("create table manoutnone (a string) location '" + EXTERNAL_TABLE_LOCATION
- + "/manoutnone'");
- runStatementOnDriver("create table manwhwh (a string) partitioned by (p string)");
- runStatementOnDriver("alter table manwhwh add partition (p='p1')");
- runStatementOnDriver("alter table manwhwh add partition (p='p2')");
- runStatementOnDriver("create table manwhout (a string) partitioned by (p string)");
- runStatementOnDriver("alter table manwhout add partition (p='p1') location '" + EXTERNAL_TABLE_LOCATION
- + "/manwhoutp1'");
- runStatementOnDriver("alter table manwhout add partition (p='p2') location '" + EXTERNAL_TABLE_LOCATION
- + "/manwhoutp2'");
- runStatementOnDriver("create table manwhmixed (a string) partitioned by (p string)");
- runStatementOnDriver("alter table manwhmixed add partition (p='p1') location '" + EXTERNAL_TABLE_LOCATION
- + "/manwhmixedp1'");
- runStatementOnDriver("alter table manwhmixed add partition (p='p2')");
- runStatementOnDriver("create table manoutout (a string) partitioned by (p string) location '" +
- EXTERNAL_TABLE_LOCATION + "/manoutout'");
- runStatementOnDriver("alter table manoutout add partition (p='p1')");
- runStatementOnDriver("alter table manoutout add partition (p='p2')");
- runStatementOnDriver("create external table extwhnone (a string)");
- runStatementOnDriver("create external table extoutnone (a string) location '" + EXTERNAL_TABLE_LOCATION
- + "/extoutnone'");
- runStatementOnDriver("create external table extwhwh (a string) partitioned by (p string)");
- runStatementOnDriver("alter table extwhwh add partition (p='p1')");
- runStatementOnDriver("alter table extwhwh add partition (p='p2')");
- runStatementOnDriver("create external table extwhout (a string) partitioned by (p string)");
- runStatementOnDriver("alter table extwhout add partition (p='p1') location '" + EXTERNAL_TABLE_LOCATION
- + "/extwhoutp1'");
- runStatementOnDriver("alter table extwhout add partition (p='p2') location '" + EXTERNAL_TABLE_LOCATION
- + "/extwhoutp2'");
- runStatementOnDriver("create external table extwhmixed (a string) partitioned by (p string)");
- runStatementOnDriver("alter table extwhmixed add partition (p='p1') location '" + EXTERNAL_TABLE_LOCATION
- + "/extwhmixedp1'");
- runStatementOnDriver("alter table extwhmixed add partition (p='p2')");
- runStatementOnDriver("create external table extoutout (a string) partitioned by (p string) location '"
- + EXTERNAL_TABLE_LOCATION + "/extoutout'");
- runStatementOnDriver("alter table extoutout add partition (p='p1')");
- runStatementOnDriver("alter table extoutout add partition (p='p2')");
- runStatementOnDriver("drop database if exists custdb");
- runStatementOnDriver("create database custdb");
- runStatementOnDriver("create table custdb.custmanwhwh (a string) partitioned by (p string)");
- runStatementOnDriver("alter table custdb.custmanwhwh add partition (p='p1')");
- runStatementOnDriver("alter table custdb.custmanwhwh add partition (p='p2')");
- runStatementOnDriver("create external table custdb.custextwhwh (a string) partitioned by (p string)");
- runStatementOnDriver("alter table custdb.custextwhwh add partition (p='p1')");
- runStatementOnDriver("alter table custdb.custextwhwh add partition (p='p2')");
- }
-
- private static void assertExternalTableLocations(File exteralWarehouseDir, File externalNonWhDir)
- throws IOException {
- Set<String> actualDirs = Files.find(Paths.get(exteralWarehouseDir.toURI()), Integer.MAX_VALUE, (p, a)->true)
- .map(p->p.toString().replaceAll(exteralWarehouseDir.getAbsolutePath(), ""))
- .filter(s->!s.isEmpty()).collect(toSet());
- Set<String> expectedDirs = new HashSet<>();
- expectedDirs.add("/extwhwh");
- expectedDirs.add("/extwhwh/p=p2");
- expectedDirs.add("/extwhwh/p=p1");
- expectedDirs.add("/extwhmixed");
- expectedDirs.add("/extwhmixed/p=p2");
- expectedDirs.add("/manwhwh");
- expectedDirs.add("/manwhwh/p=p2");
- expectedDirs.add("/manwhwh/p=p1");
- expectedDirs.add("/custdb.db");
- expectedDirs.add("/custdb.db/custmanwhwh");
- expectedDirs.add("/custdb.db/custmanwhwh/p=p2");
- expectedDirs.add("/custdb.db/custmanwhwh/p=p1");
- expectedDirs.add("/custdb.db/custextwhwh");
- expectedDirs.add("/custdb.db/custextwhwh/p=p2");
- expectedDirs.add("/custdb.db/custextwhwh/p=p1");
- expectedDirs.add("/manwhout");
- expectedDirs.add("/manwhnone");
- expectedDirs.add("/manwhmixed");
- expectedDirs.add("/manwhmixed/p=p2");
- expectedDirs.add("/extwhnone");
- expectedDirs.add("/extwhout");
- assertEquals("Unexpected external warehouse directory structure in " + exteralWarehouseDir, expectedDirs,
- actualDirs);
-
- actualDirs = Files.find(Paths.get(externalNonWhDir.toURI()), Integer.MAX_VALUE, (p, a)->true)
- .map(p->p.toString().replaceAll(externalNonWhDir.getAbsolutePath(), ""))
- .filter(s->!s.isEmpty()).collect(toSet());
- expectedDirs.clear();
- expectedDirs.add("/manoutout");
- expectedDirs.add("/extoutout/p=p2");
- expectedDirs.add("/extoutout/p=p1");
- expectedDirs.add("/extwhoutp2");
- expectedDirs.add("/extwhoutp1");
- expectedDirs.add("/manwhmixedp1");
- expectedDirs.add("/manwhoutp1");
- expectedDirs.add("/manoutout/p=p1");
- expectedDirs.add("/manoutout/p=p2");
- expectedDirs.add("/manwhoutp2");
- expectedDirs.add("/extoutnone");
- expectedDirs.add("/manoutnone");
- expectedDirs.add("/extoutout");
- expectedDirs.add("/extwhmixedp1");
- assertEquals("Unexpected external (non-warehouse) directory structure in " + externalNonWhDir, expectedDirs,
- actualDirs);
- }
-
- private static void assertSDLocationCorrect() throws HiveException {
- org.apache.hadoop.hive.ql.metadata.Table table = Hive.get().getTable("manwhwh");
- List<Partition> partitions = Hive.get().getPartitions(table);
- assertTrue(partitions.get(0).getLocation().contains("/newExternal/manwhwh/p=p1"));
- assertTrue(partitions.get(1).getLocation().contains("/newExternal/manwhwh/p=p2"));
-
- table = Hive.get().getTable("manwhout");
- partitions = Hive.get().getPartitions(table);
- assertTrue(partitions.get(0).getLocation().contains("/tmp/manwhoutp1"));
- assertTrue(partitions.get(1).getLocation().contains("/tmp/manwhoutp2"));
-
- table = Hive.get().getTable("manwhmixed");
- partitions = Hive.get().getPartitions(table);
- assertTrue(partitions.get(0).getLocation().contains("/tmp/manwhmixedp1"));
- assertTrue(partitions.get(1).getLocation().contains("/newExternal/manwhmixed/p=p2"));
- }
-}
diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
index 4c18223..3d5961b 100644
--- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
+++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
@@ -4,6 +4,7 @@ set hive.test.mode.prefix=;
set hive.test.mode.nosamplelist=managed_t,ext_t,managed_t_imported,managed_t_r_imported,ext_t_imported,ext_t_r_imported;
set hive.repl.include.external.tables=true;
set hive.repl.dump.metadata.only.for.external.table=false;
+set hive.repl.data.copy.lazy=false;
drop table if exists managed_t;
drop table if exists ext_t;
diff --git a/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q b/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q
index 08130b6..4697361 100644
--- a/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q
+++ b/ql/src/test/queries/clientpositive/repl_3_exim_metadata.q
@@ -2,6 +2,7 @@ set hive.mapred.mode=nonstrict;
set hive.test.mode=true;
set hive.test.mode.prefix=;
set hive.test.mode.nosamplelist=replsrc,repldst,repldst_md;
+set hive.repl.data.copy.lazy=false;
drop table if exists replsrc;
drop table if exists repldst;
diff --git a/ql/src/test/queries/clientpositive/repl_4_exim_nocolstat.q b/ql/src/test/queries/clientpositive/repl_4_exim_nocolstat.q
index 4b4ad07..4bb172e 100644
--- a/ql/src/test/queries/clientpositive/repl_4_exim_nocolstat.q
+++ b/ql/src/test/queries/clientpositive/repl_4_exim_nocolstat.q
@@ -4,6 +4,7 @@ set hive.test.mode.prefix=;
set hive.test.mode.nosamplelist=replsrc,repldst;
set metastore.try.direct.sql=false;
set hive.metastore.rawstore.impl=org.apache.hadoop.hive.metastore.ObjectStore;
+set hive.repl.data.copy.lazy=false;
drop table if exists replsrc;
drop table if exists repldst;
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 5a9b3ec..75b6d72 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -74,13 +74,12 @@ ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez Test
TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat \
TestStatsReplicationScenariosACIDNoAutogather TestStatsReplicationScenariosMMNoAutogather \
TestStatsReplicationScenariosACID TestStatsReplicationScenariosMM \
- TestStatsReplicationScenariosMigrationNoAutogather TestStatsReplicationScenariosMigration \
TestStatsReplicationScenariosNoAutogather TestStatsReplicationScenarios \
TestReplicationScenariosAcidTables TestReplAcidTablesBootstrapWithJsonMessage \
TestReplicationScenariosAcidTablesBootstrap TestReplAcidTablesWithJsonMessage \
TestTableLevelReplicationScenarios \
- TestReplicationScenariosExternalTables TestReplAcrossInstancesWithJsonMessageFormat TestReplTableMigrationWithJsonFormat \
- TestReplicationWithTableMigration TestReplicationWithTableMigrationEx TestReplicationOnHDFSEncryptedZonesTestReplAcidTablesWithJsonMessage \
+ TestReplicationScenariosExternalTables TestReplAcrossInstancesWithJsonMessageFormat \
+ TestReplicationOnHDFSEncryptedZonesTestReplAcidTablesWithJsonMessage \
TestReplicationScenariosAcrossInstances TestReplicationOfHiveStreaming TestReplScenariosWithStrictManaged TestReplChangeManager
unitTests.module.itests.qtest=itests.qtest