You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/12/17 04:55:37 UTC
[hive] branch master updated: HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a07c3cc HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
a07c3cc is described below
commit a07c3ccb1bff733ff41f6c38947f406aebc11353
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Dec 17 10:25:18 2021 +0530
HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +-
.../parse/TestReplicationOptimisedBootstrap.java | 453 +++++++++++++++++++++
.../parse/TestReplicationScenariosAcidTables.java | 24 --
.../TestReplicationScenariosAcrossInstances.java | 7 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 4 +-
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 294 +++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 82 +++-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 29 ++
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 4 +
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 4 +-
10 files changed, 860 insertions(+), 44 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 6220175..927650f 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -633,7 +633,8 @@ public enum ErrorMsg {
REPL_DISTCP_SNAPSHOT_EXCEPTION(40015, "SNAPSHOT_ERROR", true),
RANGER_AUTHORIZATION_FAILED(40016, "Authorization Failure while communicating to Ranger admin", true),
RANGER_AUTHENTICATION_FAILED(40017, "Authentication Failure while communicating to Ranger admin", true),
- REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is replication incompatible.", true)
+ REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is replication incompatible.", true),
+ REPL_FAILOVER_TARGET_MODIFIED(40019,"Database event id changed post table diff generation.")
;
private int errorCode;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
new file mode 100644
index 0000000..d5b819d
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+ String extraPrimaryDb;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+ overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+ overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+ internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ super.setup();
+ extraPrimaryDb = "extra_" + primaryDbName;
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+ super.tearDown();
+ }
+
+ @Test
+ public void testBuildTableDiffGeneration() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+ // Create two external & two managed tables and do a bootstrap dump & load.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2),(3),(4)")
+ .run("create external table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('chennai')")
+ .run("insert into table t2 partition(country='us') values ('new york')")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .run("create table t2_managed (place string) partitioned by (country string)")
+ .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+ .run("insert into table t2_managed partition(country='us') values ('austin')")
+ .dump(primaryDbName, withClause);
+
+ // Do the bootstrap load and check all the external & managed tables are present.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .run("show tables like 't2_managed'")
+ .verifyResult("t2_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+ tuple = primary.run("use " + primaryDbName)
+ .run("create table t5_managed (id int)")
+ .run("insert into table t5_managed values (110)")
+ .run("insert into table t5_managed values (110)")
+ .run("create table t6_managed (id int)")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't5_managed'")
+ .verifyResult("t5_managed")
+ .run("show tables like 't6_managed'")
+ .verifyResult("t6_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on other database with similar table names & some modifications on original source
+ // cluster.
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("create table t1_managed (id int)")
+ .run("use " + primaryDbName)
+ .run("create external table t4 (id int)")
+ .run("insert into table t4 values (100)")
+ .run("insert into table t4 values (201)")
+ .run("create table t4_managed (id int)")
+ .run("insert into table t4_managed values (110)")
+ .run("insert into table t4_managed values (220)")
+ .run("insert into table t2 partition(country='france') values ('lyon')")
+ .run("insert into table t2_managed partition(country='france') values ('nice')")
+ .run("alter table t6_managed add columns (name string)")
+ .run("drop table t5_managed");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check the event ack file got created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+ // doesn't change, without any errors.
+
+ Path dumpPath = new Path(tuple.dumpLocation);
+ ContentSummary beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+ assertTrue(emptyTuple.dumpLocation.isEmpty());
+ assertTrue(emptyTuple.lastReplicationId.isEmpty());
+ ContentSummary afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+ // Check the event ack file stays intact, despite having a skipped dump.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Do a load, this should create a table_diff_complete directory
+ primary.load(primaryDbName,replicatedDbName, withClause);
+
+ // Check the table diff directory exist.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check the table diff has all the modified table, including the dropped and empty ones
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+ // Do a load again and see, nothing changes as this load isn't consumed.
+ beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+ afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+ // Check there are entries in the table files.
+ assertFalse(getPathsFromTableFile("t4", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t2", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t4_managed", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t2_managed", dumpPath, conf).isEmpty());
+
+ // Check the dropped and empty tables.
+ assertTrue(getPathsFromTableFile("t5_managed", dumpPath, conf).isEmpty());
+ assertTrue(getPathsFromTableFile("t6_managed", dumpPath, conf).size() == 1);
+ }
+
+ @Test
+ public void testEmptyDiffForControlFailover() throws Throwable {
+
+ // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+ // error.
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some tables & do a incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (100),(200)")
+ .run("insert into table t1 values (12),(35),(46)")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (120)")
+ .run("insert into table t1_managed values (10),(321),(423)")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load and see all the tables are there.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed");
+
+ // Trigger reverse cycle. Do dump on target cluster.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "rev");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Even though no diff, the event ack file should be created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Do a reverse load.
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ // Check the table diff directory still gets created.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check the table diff is empty, since we are in sync, so no tables got modified.
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+ assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+ }
+
+ @Test
+ public void testFirstIncrementalMandatory() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+ // Create one external and one managed tables and do a bootstrap dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2),(3),(4)")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ // Do a bootstrap load and check both managed and external tables are loaded.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Trigger reverse dump just after the bootstrap cycle.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+ try {
+ replica.dump(replicatedDbName, withClause);
+ } catch (HiveException he) {
+ assertTrue(he.getMessage()
+ .contains("Replication dump not allowed for replicated database with first incremental dump pending : " + replicatedDbName));
+ }
+
+ // Do a incremental cycle and check we don't get this exception.
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Retrigger reverse dump, this time it should be successful and event ack should get created.
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+ }
+
+ @Test
+ public void testFailureCasesInTableDiffGeneration() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do an incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('A')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load and check the tables are there.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table t2_managed (id string)")
+ .run("insert into table t1_managed values ('S')")
+ .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+ // Do some modifications in another database to have unrelated events as well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (15),(1),(96)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('SA'),('PS')");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ // Trigger dump on target cluster.
+
+ replicaFs.setQuota(newReplDir, 1, 10000);
+ try {
+ tuple = replica.dump(replicatedDbName, withClause);
+ fail("Should have failed due to quota violation");
+ } catch (Exception e) {
+ // Ignore it is expected due to Quota violation.
+ }
+ // Check the event_ack file doesn't exist.
+ assertFalse("event ack file exists despite quota violation", replicaFs.listFiles(newReplDir, true).hasNext());
+
+ // Set the quota to a value that makes sure event ack file gets created and then fails
+ replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 3, QUOTA_RESET);
+ try {
+ tuple = replica.dump(replicatedDbName, withClause);
+ fail("Should have failed due to quota violation");
+ } catch (Exception e) {
+ // Ignore it is expected due to Quota violation.
+ }
+
+ // Check the event ack file got created despite exception and failure.
+ assertEquals("event_ack", replicaFs.listFiles(newReplDir, true).next().getPath().getName());
+
+ // Remove quota for a successful dump
+ replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+
+ // Retry Dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file is there.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Set quota again to restrict creation of table diff in middle during load.
+ replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2, QUOTA_RESET);
+
+ try {
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ } catch (Exception e) {
+ // Ignore, expected due to quota violation.
+ }
+
+ // Check table diff in progress directory gets created.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+ // Check table diff complete directory doesn't gets created.
+ assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Set Quota to a value so that table diff complete gets created and we fail post that.
+ replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 1, QUOTA_RESET);
+ try {
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ fail("Expected failure due to quota violation");
+ } catch (Exception e) {
+ // Ignore, expected due to quota violation.
+ }
+
+ // Check table diff complete directory gets created.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Remove the quota and see everything recovers.
+ replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ // Check table diff in complete directory gets created.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check table diff in progress directory isn't there now.
+ assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+ // Check the entries in table diff are correct.
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries,
+ tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed")));
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d05ff85..b3ddbf5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -3031,30 +3031,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
}
- @Test
- public void testReplTargetOfReplication() throws Throwable {
- // Bootstrap
- WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName);
- verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
-
- //Try to do a dump on replicated db. It should fail
- replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')");
- try {
- replica.dump(replicatedDbName);
- } catch (Exception e) {
- Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage());
- Assert.assertEquals(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getErrorCode(),
- ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
- }
- replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')");
-
- //Try to dump a different db on replica. That should succeed
- replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')")
- .dump(replicatedDbName + "_extra");
- replica.run("drop database if exists " + replicatedDbName + "_extra cascade");
- }
-
private void verifyPathExist(FileSystem fs, Path filePath) throws IOException {
assertTrue("Path not found:" + filePath, fs.exists(filePath));
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 37c3970..d9c63c6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1084,10 +1084,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.verifyResult(tuplePrimary.lastReplicationId)
.run("show tblproperties t1('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" })
- .dumpFailure(replicatedDbName)
.run("alter database " + replicatedDbName
- + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')")
- .dumpFailure(replicatedDbName); //can not dump the db before first successful incremental load is done.
+ + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
// do a empty incremental load to allow dump of replicatedDbName
WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList());
@@ -1178,8 +1176,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
replica.load(replicatedDbName, primaryDbName);
assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
- replica.dumpFailure(replicatedDbName); //can not dump db which is target of replication
-
replica.run("ALTER DATABASE " + replicatedDbName + " Set DBPROPERTIES('repl.target.for' = '')");
assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
replica.dump(replicatedDbName);
@@ -1192,7 +1188,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
replica.getDatabase(replicatedDbName).getParameters());
assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
- replica.dumpFailure(replicatedDbName); //Cannot dump database which is target of replication.
}
@Test
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 1191267..eee5702 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -227,7 +227,7 @@ public class WarehouseInstance implements Closeable {
driver.getResults(lastResults);
}
// Split around the 'tab' character
- return (lastResults.get(0).split("\\t"))[colNum];
+ return !lastResults.isEmpty() ? (lastResults.get(0).split("\\t"))[colNum] : "";
}
public WarehouseInstance run(String command) throws Throwable {
@@ -388,7 +388,7 @@ public class WarehouseInstance implements Closeable {
List<String> lowerCaseData =
Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList());
assertEquals(data.length, filteredResults.size());
- assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all expected" + StringUtils
+ assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all expected " + StringUtils
.join(lowerCaseData, ","), filteredResults.containsAll(lowerCaseData));
return this;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
new file mode 100644
index 0000000..85bbbec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+/**
+ * Utility class for handling operations regarding optimised bootstrap in case of replication.
+ */
+public class OptimisedBootstrapUtils {
+
+ /** Separator used to separate entries in the listing. */
+ public static final String FILE_ENTRY_SEPARATOR = "#";
+ private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+ /** table diff directory when in progress */
+ public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+ /** table diff directory when complete */
+ public static final String TABLE_DIFF_COMPLETE_DIRECTORY = "table_diff_complete";
+
+ /** event ack file which contains the event id till which the cluster was last loaded. */
+ public static final String EVENT_ACK_FILE = "event_ack";
+
+ /**
+ * Gets & checks whether the database is target of replication.
+ * @param dbName name of database
+ * @param hive hive object
+ * @return true, if the database has repl.target.for property set.
+ * @throws HiveException
+ */
+ public static boolean isFailover(String dbName, Hive hive) throws HiveException {
+ Database database = hive.getDatabase(dbName);
+ return database != null ? MetaStoreUtils.isTargetOfReplication(database) : false;
+ }
+
+ public static boolean checkFileExists(Path dumpPath, HiveConf conf, String fileName) throws IOException {
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ return fs.exists(new Path(dumpPath, fileName));
+ }
+
+ /**
+ * Gets the event id from the event ack file
+ * @param dumpPath the dump path
+ * @param conf the hive configuration
+ * @return the event id from file.
+ * @throws IOException
+ */
+ public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+ String lastEventId;
+ Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+ FileSystem fs = eventAckFilePath.getFileSystem(conf);
+ try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+ lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+ }
+
+ /**
+ * Gets the name of tables in the table diff file.
+ * @param dumpPath the dump path
+ * @param conf the hive configuration
+ * @return Set with list of tables
+ * @throws Exception
+ */
+ public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, HiveConf conf) throws Exception {
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+ FileStatus[] list = fs.listStatus(tableDiffPath);
+ HashSet<String> tables = new HashSet<>();
+ for (FileStatus fStatus : list) {
+ tables.add(fStatus.getPath().getName());
+ }
+ return tables;
+ }
+
+ /**
+ * Extracts the recursive listing from the table file.
+ * @param file the name of table
+ * @param dumpPath the dump path
+ * @param conf the hive conf
+ * @return the list of paths in the table.
+ * @throws IOException
+ */
+ public static HashSet<String> getPathsFromTableFile(String file, Path dumpPath, HiveConf conf) throws IOException {
+ HashSet<String> paths = new HashSet<>();
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+ Path filePath = new Path(tableDiffPath, file);
+ String allEntries;
+ try (FSDataInputStream stream = fs.open(filePath);) {
+ allEntries = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ paths.addAll(Arrays.asList(allEntries.split(System.lineSeparator())).stream().filter(item -> !item.isEmpty())
+ .collect(Collectors.toSet()));
+ return paths;
+ }
+
+ /**
+ * Gets the event id stored in database denoting the last loaded event id.
+ * @param dbName the name of database
+ * @param hiveDb the hive object
+ * @return event id from the database
+ * @throws HiveException
+ */
+ public static String getReplEventIdFromDatabase(String dbName, Hive hiveDb) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ String currentLastEventId = getLastReplicatedStateFromParameters(database.getParameters());
+ return currentLastEventId;
+ }
+
+ /**
+ * Validates if the first incremental is done before starting optimised bootstrap
+ * @param dbName name of database
+ * @param hiveDb the hive object
+ * @throws HiveException
+ */
+ public static void isFirstIncrementalPending(String dbName, Hive hiveDb) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null || ReplUtils.isFirstIncPending(database.getParameters()))
+ throw new HiveException(
+ "Replication dump not allowed for replicated database with first incremental dump pending : " + dbName);
+ }
+
+ /**
+ * Creates the event ack file and sets the dump metadata post that marking completion of dump flow for first round
+ * of optimised failover dump.
+ * @param currentDumpPath the dump path
+ * @param dmd the dump metadata
+ * @param cmRoot the cmRoot
+ * @param dbEventId the database event id to which we have to write in the file.
+ * @param conf the hive configuraiton
+ * @param work the repldump work
+ * @return the lastReplId denoting a fake dump(-1) always
+ * @throws SemanticException
+ */
+ public static Long createAndGetEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId,
+ HiveConf conf, ReplDumpWork work)
+ throws SemanticException {
+ // Keep an invalid value for lastReplId, to denote it isn't a actual dump.
+ Long lastReplId = -1L;
+ Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE);
+ Utils.writeOutput(dbEventId, filePath, conf);
+ LOG.info("Created event_ack file at {} with eventId {}", filePath, dbEventId);
+ work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
+ dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
+ dmd.write(true);
+ return lastReplId;
+ }
+
+ /**
+ * Prepares the table diff file, with tables modified post the specified event id.
+ * @param eventId the event id after which tables should be modified
+ * @param hiveDb the hive object
+ * @param work the load work
+ * @param conf hive configuration
+ * @throws Exception
+ */
+ public static void prepareTableDiffFile(Long eventId, Hive hiveDb, ReplLoadWork work, HiveConf conf)
+ throws Exception {
+ // Get the notification events.
+ List<NotificationEvent> notificationEvents =
+ hiveDb.getMSC().getNextNotification(eventId - 1, -1, new DatabaseAndTableFilter(work.dbNameToLoadIn, null))
+ .getEvents();
+
+ // Check the first eventId fetched is the same as what we fed, to ensure the events post that hasn't expired.
+ if (notificationEvents.get(0).getEventId() != eventId) {
+ throw new Exception("Failover notification events expired.");
+ }
+ // Remove the first one, it is already loaded, we fetched it to confirm the notification events post that haven't
+ // expired.
+ notificationEvents.remove(0);
+ HashSet<String> modifiedTables = new HashSet<>();
+ for (NotificationEvent event : notificationEvents) {
+ String tableName = event.getTableName();
+ if (tableName != null) {
+ LOG.debug("Added table {} because of eventId {} and eventType {}", event.getTableName(), event.getEventId(),
+ event.getEventType());
+ modifiedTables.add(event.getTableName());
+ }
+ }
+ Path dumpPath = new Path(work.dumpDirectory).getParent();
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path diffFilePath = new Path(dumpPath, TABLE_DIFF_INPROGRESS_DIRECTORY);
+ fs.mkdirs(diffFilePath);
+ for (String table : modifiedTables) {
+ String tables = "";
+ LOG.info("Added table {} to table diff", table);
+ ArrayList<String> pathList = getListing(work.dbNameToLoadIn, table, hiveDb, conf);
+ for (String path : pathList) {
+ tables += path + System.lineSeparator();
+ }
+ Utils.writeOutput(tables, new Path(diffFilePath, table), conf);
+ }
+ LOG.info("Completed writing table diff progress file at {} with entries {}", dumpPath, modifiedTables);
+ // The operation is complete, we can rename to TABLE_DIFF_COMPLETE
+ fs.rename(diffFilePath, new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY));
+ LOG.info("Completed renaming table diff progress file to table diff complete file.");
+ }
+
+ private static ArrayList<String> getListing(String dbName, String tableName, Hive hiveDb, HiveConf conf)
+ throws HiveException, IOException {
+ ArrayList<String> paths = new ArrayList<>();
+ Table table = hiveDb.getTable(dbName, tableName, false);
+ if (table == null) {
+ LOG.info("Table {} not found, excluding the dropped table", tableName);
+ // If the table is not there, return an empty list of paths, we would need to copy the entire stuff.
+ return new ArrayList<>();
+ }
+ Path tableLocation = new Path(table.getSd().getLocation());
+ paths.add(table.getSd().getLocation());
+ FileSystem tableFs = tableLocation.getFileSystem(conf);
+ buildListingForDirectory(paths, tableLocation, tableFs);
+ // Check if the table is partitioned, in case the table is partitioned we need to check for the partitions
+ // listing as well.
+ if (table.isPartitioned()) {
+ List<Partition> partitions = hiveDb.getPartitions(table);
+ for (Partition part : partitions) {
+ Path partPath = part.getDataLocation();
+ // Build listing for the partition only if it doesn't lies within the table location, else it would have been
+ // already included as part of recursive listing of table directory.
+ if (!FileUtils.isPathWithinSubtree(partPath, tableLocation)) {
+ buildListingForDirectory(paths, partPath, tableFs);
+ }
+ }
+ }
+ return paths;
+ }
+
+ private static void buildListingForDirectory(ArrayList<String> listing, Path tableLocation, FileSystem tableFs)
+ throws IOException {
+ if (!tableFs.exists(tableLocation)) {
+ return;
+ }
+ RemoteIterator<FileStatus> itr = tableFs.listStatusIterator(tableLocation);
+ while (itr.hasNext()) {
+ FileStatus fstatus = itr.next();
+ if (fstatus.isDirectory()) {
+ listing.add(fstatus.getPath().toString());
+ buildListingForDirectory(listing, fstatus.getPath(), tableFs);
+ } else {
+ listing.add(fstatus.getPath() + FILE_ENTRY_SEPARATOR + fstatus.getLen() + FILE_ENTRY_SEPARATOR + tableFs
+ .getFileChecksum(fstatus.getPath()));
+ }
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index de18fe6..a9d728d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -125,6 +125,14 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHO
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
@@ -139,6 +147,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final long SLEEP_TIME_FOR_TESTS = 30000;
private Set<String> tablesForBootstrap = new HashSet<>();
private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED);
+ private boolean createEventMarker = false;
public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
private final String name;
@@ -178,19 +187,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
boolean isFailoverMarkerPresent = false;
- if (previousValidHiveDumpPath == null) {
+ boolean isFailover = isFailover(work.dbNameOrPattern, getHive());
+ LOG.debug("Database is {} going through failover", isFailover ? "" : "not");
+ if (previousValidHiveDumpPath == null && !isFailover) {
work.setBootstrap(true);
} else {
- work.setOldReplScope(new DumpMetaData(previousValidHiveDumpPath, conf).getReplScope());
- isFailoverMarkerPresent = isDumpFailoverReady(previousValidHiveDumpPath);
+ work.setOldReplScope(isFailover ? null : new DumpMetaData(previousValidHiveDumpPath, conf).getReplScope());
+ isFailoverMarkerPresent = !isFailover && isDumpFailoverReady(previousValidHiveDumpPath);
}
//Proceed with dump operation in following cases:
//1. No previous dump is present.
//2. Previous dump is already loaded and it is not in failover ready status.
- if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent)) {
+ if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent, isFailover)) {
Path currentDumpPath = getCurrentDumpPath(dumpRoot, work.isBootstrap());
Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
- if (!work.isBootstrap()) {
+ if (!work.isBootstrap() && !isFailover) {
preProcessFailoverIfRequired(previousValidHiveDumpPath, isFailoverMarkerPresent);
}
// Set distCp custom name corresponding to the replication policy.
@@ -211,14 +222,37 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET));
- if (work.isBootstrap()) {
+ if (isFailover) {
+ LOG.info("Optimised Bootstrap Dump triggered for {}.", work.dbNameOrPattern);
+ // Before starting optimised bootstrap, check if the first incremental is done to ensure database is in
+ // consistent state.
+ isFirstIncrementalPending(work.dbNameOrPattern, getHive());
+ // Get the last replicated event id from the database.
+ String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
+ // Check if the tableDiff directory is present or not.
+ boolean isTableDiffDirectoryPresent = checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+ if (createEventMarker) {
+ LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
+ lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, conf, work);
+ finishRemainingTasks();
+ } else {
+ // We should be here only if TableDiff is Present.
+ assert isTableDiffDirectoryPresent;
+ // TODO: Dump using TableDiff file & get lastReplId
+ lastReplId = -1L;
+ }
+ }
+ else if (work.isBootstrap()) {
lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive());
} else {
work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath));
lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
}
- work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
- initiateDataCopyTasks();
+ // The datacopy doesn't need to be initialised in case of optimised bootstrap first dump.
+ if (lastReplId >= 0) {
+ work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
+ initiateDataCopyTasks();
+ }
} else {
if (isFailoverMarkerPresent) {
LOG.info("Previous Dump is failover ready. Skipping this iteration.");
@@ -487,16 +521,46 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return false;
}
- private boolean shouldDump(Path previousDumpPath, boolean isFailoverMarkerPresent) throws IOException {
+ private boolean shouldDump(Path previousDumpPath, boolean isFailoverMarkerPresent, boolean isFailover)
+ throws IOException, HiveException {
/** a) If there is no previous dump dir found, the current run is bootstrap case.
* b) If the previous dump was successful and it contains failover marker file as well as
* HiveConf.ConfVars.HIVE_REPL_FAILOVER_START == true, last dump was a controlled failover dump,
* skip doing any further dump.
*/
if (previousDumpPath == null) {
+ createEventMarker = isFailover;
return true;
} else if (isFailoverMarkerPresent && shouldFailover()) {
return false;
+ } else if (isFailover) {
+ // In case of OptimisedBootstrap Failover, We need to do a dump in case:
+ // 1. No EVENT_ACK file is there.
+ // 2. EVENT_ACK file and TABLE_DIFF_COMPLETE file is also there and the current database id is same as that in
+ // the EVENT_ACK file
+ boolean isEventAckFilePresent = checkFileExists(previousDumpPath.getParent(), conf, EVENT_ACK_FILE);
+ if (!isEventAckFilePresent) {
+ // If in the previous valid dump path, Event_Ack isn't there that means the previous one was a normal dump,
+ // we need to trigger the failover dump
+ LOG.debug("EVENT_ACK file not found in {}. Proceeding with OptimisedBootstrap Failover",
+ previousDumpPath.getParent());
+ createEventMarker = true;
+ return true;
+ }
+ // Event_ACK file is present check if it contains correct value or not.
+ String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf);
+ String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()).trim();
+ if (!dbEventId.equalsIgnoreCase(fileEventId)) {
+ // In case the database event id changed post table_diff_complete generation, that means both forward &
+ // backward policies are operational, We fail in that case with non-recoverable error.
+ LOG.error("The database eventID {} and the event id in the EVENT_ACK file {} both mismatch. FilePath {}",
+ dbEventId, fileEventId, previousDumpPath.getParent());
+ throw new RuntimeException("Database event id changed post table diff generation.");
+ } else {
+ // Check table_diff_complete and Load_ACK
+ return checkFileExists(previousDumpPath.getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY) && checkFileExists(previousDumpPath,
+ conf, LOAD_ACKNOWLEDGEMENT.toString());
+ }
} else {
FileSystem fs = previousDumpPath.getFileSystem(conf);
return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index bac747f..3a2dda9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
@@ -88,6 +89,10 @@ import java.util.LinkedList;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.prepareTableDiffFile;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_METADATA;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.getExternalTableBaseDir;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
@@ -680,6 +685,30 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
Map<String, String> props = new HashMap<>();
+
+ // Check if it is a optimised bootstrap failover.
+ if (work.isFailover) {
+ // Check it should be marked as target of replication & not source of replication.
+ if (MetaStoreUtils.isTargetOfReplication(targetDb)) {
+ LOG.error("The database {} is already marked as target for replication", targetDb.getName());
+ throw new Exception("Failover target is already marked as target");
+ }
+ if (!ReplChangeManager.isSourceOfReplication(targetDb)) {
+ LOG.error("The database {} is already source of replication.", targetDb.getName());
+ throw new Exception("Failover target was not source of replication");
+ }
+ boolean isTableDiffPresent =
+ checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+ Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf));
+ if (!isTableDiffPresent) {
+ prepareTableDiffFile(eventId, getHive(), work, conf);
+ if (this.childTasks == null) {
+ this.childTasks = new ArrayList<>();
+ }
+ createReplLoadCompleteAckTask();
+ return 0;
+ }
+ }
if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 7c51df5..cf38651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_STATS_TOP_EVENTS_COUNTS;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
Explain.Level.DEFAULT,
@@ -86,6 +87,7 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
private String scheduledQueryName;
private String executionId;
private boolean shouldFailover;
+ public boolean isFailover;
/*
these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -134,6 +136,8 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf);
shouldFailover = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)
&& fs.exists(failoverReadyMarker);
+ isFailover =
+ checkFileExists(new Path(dumpDirectory).getParent(), hiveConf, OptimisedBootstrapUtils.EVENT_ACK_FILE);
incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector,
replStatsTracker, shouldFailover);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index dd611c0..0b9eb57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -183,8 +183,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
"{} is set to TARGET.", dbName, ReplConst.REPL_FAILOVER_ENDPOINT);
ReplUtils.unsetDbPropIfSet(database, ReplConst.TARGET_OF_REPLICATION, db);
} else {
- LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a target of replication (repl.target.for)");
- throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg());
+ LOG.warn("Database " + dbNameOrPattern + " is marked as target of replication (repl.target.for), Will "
+ + "trigger failover.");
}
}
} else {